"""Simplified GeoPoison-RAG attack simulation. Realistic threat model (matching GeoPoison-RAG Phase 1): - Attacker has shadow queries approximating target query distribution. - Attacker has access to document embeddings. - Attacker builds bipartite query-document graph using COSINE SIMILARITY (their model of how retrieval works). - Attacker computes Fiedler vector and places adversarial doc at the spectral-optimal position in document space. Defense argument: - Baseline (cosine sim): attacker's model is correct → high ASR. - Multi-manifold (R(q,d)): attacker's model is wrong because R ≠ cosine → lower ASR. """ import numpy as np import torch from scipy.sparse.linalg import eigsh from sklearn.metrics.pairwise import cosine_similarity from multi_manifold_retrieval.evaluation.spectral_analysis import compute_document_laplacian def select_domain_documents( passages: list[str], keywords: list[str], max_docs: int = 200, ) -> tuple[list[int], list[str]]: """Select documents belonging to a target domain by keyword matching.""" indices = [] texts = [] for i, text in enumerate(passages): text_lower = text.lower() if any(kw in text_lower for kw in keywords): indices.append(i) texts.append(text) if len(indices) >= max_docs: break return indices, texts def build_bipartite_fiedler_placement( query_embs: np.ndarray, doc_embs: np.ndarray, t_nn: int = 20, ) -> tuple[np.ndarray, dict]: """GeoPoison-RAG Phase 1: bipartite spectral placement (cosine-based). The attacker: 1. Builds bipartite query-document graph using cosine similarity. 2. Computes Fiedler vector of the normalized Laplacian. 3. Extracts document component of Fiedler vector. 4. Places adversarial doc at Fiedler-weighted centroid of documents. The placement is in DOCUMENT SPACE — the attacker optimizes where to place a document, guided by the query-document spectral structure. But the attacker assumes retrieval = cosine similarity. """ nq = query_embs.shape[0] nd = doc_embs.shape[0] # Cosine similarity between queries and documents (attacker's model) S = cosine_similarity(query_embs, doc_embs) # (nq, nd) # Sparsify: keep top-t per query t = min(t_nn, nd - 1) S_sparse = np.zeros_like(S) for i in range(nq): top_idx = np.argpartition(S[i], -t)[-t:] S_sparse[i, top_idx] = S[i, top_idx] # Build bipartite adjacency: A = [[0, S], [S^T, 0]] n = nq + nd A = np.zeros((n, n)) A[:nq, nq:] = S_sparse A[nq:, :nq] = S_sparse.T # Normalized Laplacian: L = I - D^{-1/2} A D^{-1/2} degrees = A.sum(axis=1) degrees[degrees == 0] = 1.0 D_inv_sqrt = np.diag(1.0 / np.sqrt(degrees)) L = np.eye(n) - D_inv_sqrt @ A @ D_inv_sqrt # Fiedler vector (2nd smallest eigenvector) k = min(3, n - 1) eigenvalues, eigenvectors = eigsh(L, k=k, which="SM") sorted_idx = np.argsort(eigenvalues) fiedler_vec = eigenvectors[:, sorted_idx[1]] fiedler_val = eigenvalues[sorted_idx[1]] # Extract document component and use as weights doc_component = fiedler_vec[nq:] weights = np.abs(doc_component) weights = weights / (weights.sum() + 1e-12) # Fiedler-weighted centroid of documents adv_embedding = (weights[:, None] * doc_embs).sum(axis=0) # L2-normalize norm = np.linalg.norm(adv_embedding) if norm > 0: adv_embedding = adv_embedding / norm info = { "method": "bipartite_fiedler", "fiedler_eigenvalue": float(fiedler_val), "weight_entropy": float(-np.sum(weights * np.log(weights + 1e-12))), "max_weight": float(weights.max()), "adv_mean_cos_to_queries": float( cosine_similarity(adv_embedding.reshape(1, -1), query_embs).mean() ), "adv_mean_cos_to_docs": float( cosine_similarity(adv_embedding.reshape(1, -1), doc_embs).mean() ), } return adv_embedding, info def compute_doconly_fiedler_placement(doc_embs: np.ndarray) -> tuple[np.ndarray, dict]: """Document-only Fiedler placement (no query access). Weaker attacker that only has document embeddings. Uses document-space Laplacian L_D directly. """ n = doc_embs.shape[0] if n < 3: centroid = doc_embs.mean(axis=0) return centroid / np.linalg.norm(centroid), {"method": "centroid_fallback"} L_D, _ = compute_document_laplacian(doc_embs) k = min(3, n - 1) eigenvalues, eigenvectors = eigsh(L_D, k=k, which="SM") sorted_idx = np.argsort(eigenvalues) fiedler_vec = eigenvectors[:, sorted_idx[1]] fiedler_val = eigenvalues[sorted_idx[1]] weights = np.abs(fiedler_vec) weights = weights / (weights.sum() + 1e-12) adv_embedding = (weights[:, None] * doc_embs).sum(axis=0) norm = np.linalg.norm(adv_embedding) if norm > 0: adv_embedding = adv_embedding / norm return adv_embedding, { "method": "doconly_fiedler", "fiedler_eigenvalue": float(fiedler_val), } def compute_asr_threshold( query_embeddings: torch.Tensor, corpus_embeddings: torch.Tensor, adv_embedding: torch.Tensor, operator, top_k: int = 10, device: str = "cpu", batch_size: int = 50, ) -> tuple[float, dict]: """Compute ASR@k using per-query threshold (oracle-style). For each query, the k-th highest corpus score is the threshold. Attack succeeds if the adversarial doc's score >= threshold. Mirrors gp_rag/plan_single.py oracle check. """ num_queries = query_embeddings.shape[0] corpus_emb = corpus_embeddings.to(device) adv_emb = adv_embedding.to(device) operator.eval() successes = 0 margins = [] with torch.no_grad(): for start in range(0, num_queries, batch_size): end = min(start + batch_size, num_queries) q_batch = query_embeddings[start:end].to(device) bs = q_batch.shape[0] # Score adversarial document adv_expanded = adv_emb.unsqueeze(0).expand(bs, -1) adv_scores = operator(q_batch, adv_expanded) # Score corpus documents corpus_scores = operator.compute_pairwise(q_batch, corpus_emb) # k-th highest corpus score = threshold topk_vals, _ = torch.topk(corpus_scores, top_k, dim=1) thresholds = topk_vals[:, -1] for j in range(bs): margin = float(adv_scores[j].item() - thresholds[j].item()) margins.append(margin) if adv_scores[j] >= thresholds[j]: successes += 1 asr = successes / num_queries margins_arr = np.array(margins) info = { "mean_margin": float(margins_arr.mean()), "median_margin": float(np.median(margins_arr)), "p25_margin": float(np.percentile(margins_arr, 25)), "fraction_positive_margin": float((margins_arr >= 0).mean()), } return asr, info def run_attack_simulation( encoder, operator, baseline_operator, passages: list[str], passage_embeddings_torch: torch.Tensor, target_query_texts: list[str], medical_keywords: list[str], top_k: int = 10, max_domain_docs: int = 200, device: str = "cpu", ) -> dict: """Run GeoPoison-RAG attack simulation. Tests two attacker models: 1. Bipartite Fiedler (realistic): attacker has shadow queries + docs, builds cosine-based bipartite graph, optimizes in document space. 2. Doc-only Fiedler (weaker): attacker has only document embeddings. Both assume cosine similarity governs retrieval. The defense breaks this assumption via the cross-manifold operator R. """ print("\n=== Attack Simulation ===", flush=True) # Step 1: Select target domain documents domain_indices, domain_texts = select_domain_documents( passages, medical_keywords, max_domain_docs ) print(f"Selected {len(domain_indices)} domain documents.", flush=True) if len(domain_indices) < 5: print("Warning: Too few domain documents found.") return {"error": "insufficient domain documents"} domain_embs_np = passage_embeddings_torch[domain_indices].cpu().numpy() domain_corpus = passage_embeddings_torch[domain_indices] # Step 2: Encode target queries (attacker's shadow queries) print(f"Encoding {len(target_query_texts)} target queries...", flush=True) query_embeddings = encoder.encode_queries(target_query_texts, show_progress=False) q_np = query_embeddings.cpu().numpy() # Step 3a: Bipartite Fiedler placement (realistic attacker) print("\nComputing bipartite Fiedler placement (attacker has shadow queries)...", flush=True) adv_bipartite_np, bp_info = build_bipartite_fiedler_placement( q_np, domain_embs_np, t_nn=min(20, len(domain_indices) - 1) ) adv_bipartite = torch.tensor(adv_bipartite_np, dtype=torch.float32) print(f" Fiedler eigenvalue: {bp_info['fiedler_eigenvalue']:.6f}", flush=True) print(f" Adv mean cos to queries: {bp_info['adv_mean_cos_to_queries']:.4f}", flush=True) print(f" Adv mean cos to docs: {bp_info['adv_mean_cos_to_docs']:.4f}", flush=True) # Step 3b: Doc-only Fiedler placement (weaker attacker) print("\nComputing doc-only Fiedler placement (no query access)...", flush=True) adv_doconly_np, do_info = compute_doconly_fiedler_placement(domain_embs_np) adv_doconly = torch.tensor(adv_doconly_np, dtype=torch.float32) # Step 4: Measure ASR for bipartite attack print(f"\n--- Bipartite Fiedler Attack (realistic GeoPoison-RAG) ---", flush=True) asr_bp_base, bp_base_info = compute_asr_threshold( query_embeddings, domain_corpus, adv_bipartite, baseline_operator, top_k, device ) print(f" Baseline ASR@{top_k}: {asr_bp_base:.4f} (mean margin: {bp_base_info['mean_margin']:.4f})", flush=True) asr_bp_mm, bp_mm_info = compute_asr_threshold( query_embeddings, domain_corpus, adv_bipartite, operator, top_k, device ) print(f" Multi-manifold ASR@{top_k}: {asr_bp_mm:.4f} (mean margin: {bp_mm_info['mean_margin']:.4f})", flush=True) # Step 5: Measure ASR for doc-only attack print(f"\n--- Doc-only Fiedler Attack (weaker attacker) ---", flush=True) asr_do_base, do_base_info = compute_asr_threshold( query_embeddings, domain_corpus, adv_doconly, baseline_operator, top_k, device ) print(f" Baseline ASR@{top_k}: {asr_do_base:.4f} (mean margin: {do_base_info['mean_margin']:.4f})", flush=True) asr_do_mm, do_mm_info = compute_asr_threshold( query_embeddings, domain_corpus, adv_doconly, operator, top_k, device ) print(f" Multi-manifold ASR@{top_k}: {asr_do_mm:.4f} (mean margin: {do_mm_info['mean_margin']:.4f})", flush=True) # Summary results = { "bipartite_attack": { "baseline_asr": asr_bp_base, "multi_manifold_asr": asr_bp_mm, "baseline_margins": bp_base_info, "multi_manifold_margins": bp_mm_info, "placement_info": bp_info, }, "doconly_attack": { "baseline_asr": asr_do_base, "multi_manifold_asr": asr_do_mm, "baseline_margins": do_base_info, "multi_manifold_margins": do_mm_info, "placement_info": do_info, }, "num_domain_docs": len(domain_indices), "num_target_queries": len(target_query_texts), "top_k": top_k, # For backward compat with summary printing "baseline_asr": asr_bp_base, "multi_manifold_asr": asr_bp_mm, } def _reduction(base, mm): return (1 - mm / max(base, 1e-9)) * 100 print(f"\n=== Attack Results Summary ===", flush=True) print(f" Baseline Multi-Manifold Reduction", flush=True) print(f" Bipartite (realistic): {asr_bp_base:.4f} {asr_bp_mm:.4f}" f" {_reduction(asr_bp_base, asr_bp_mm):.1f}%", flush=True) print(f" Doc-only (weaker): {asr_do_base:.4f} {asr_do_mm:.4f}" f" {_reduction(asr_do_base, asr_do_mm):.1f}%", flush=True) return results