#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ SafeRAG Real Embedding Test Load data -> Generate real embeddings using sentence-transformers -> Build index -> Retrieve """ import sys import os import time import numpy as np sys.path.append(os.path.dirname(os.path.abspath(__file__))) def test_real_embedding_pipeline(): """Test the complete pipeline with real embeddings""" print("SafeRAG Real Embedding Pipeline Test") print("=" * 50) try: # Step 1: Load data print("\n1. Loading data...") from data_processing import DataLoader, Preprocessor loader = DataLoader() preprocessor = Preprocessor() # Load knowledge base kb_passages = loader.get_knowledge_base() print(f" ✓ Loaded {len(kb_passages)} knowledge base passages") # Show sample passages for i, passage in enumerate(kb_passages): print(f" [{i+1}] {passage}") # Preprocess passages processed_passages = preprocessor.preprocess_passages(kb_passages) print(f" ✓ Preprocessed {len(processed_passages)} passages") # Step 2: Generate real embeddings print("\n2. Generating real embeddings with sentence-transformers...") from retriever import Embedder # Use a smaller model for faster testing embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu") print(f" ✓ Loaded embedding model: {embedder.model_name}") print(f" ✓ Embedding dimension: {embedder.get_dimension()}") # Extract text from processed passages passage_texts = [p['text'] for p in processed_passages] # Generate embeddings start_time = time.time() embeddings = embedder.encode_passages(passage_texts) embedding_time = time.time() - start_time print(f" ✓ Generated {embeddings.shape[0]} embeddings in {embedding_time:.3f}s") print(f" ✓ Embedding shape: {embeddings.shape}") print(f" ✓ Embedding type: {type(embeddings)}") # Show embedding statistics print(f" ✓ Embedding stats:") print(f" - Mean: {np.mean(embeddings):.4f}") print(f" - Std: {np.std(embeddings):.4f}") print(f" - Min: {np.min(embeddings):.4f}") print(f" - Max: {np.max(embeddings):.4f}") # Step 3: Build FAISS index print("\n3. Building FAISS index...") from retriever import FAISSIndex index = FAISSIndex(embedder.get_dimension()) start_time = time.time() index.build_index(embeddings, passage_texts) build_time = time.time() - start_time print(f" ✓ Built FAISS index in {build_time:.3f}s") print(f" ✓ Index contains {index.index.ntotal} vectors") # Step 4: Test retrieval print("\n4. Testing retrieval...") from retriever import Retriever retriever = Retriever(embedder, index, None) # No reranker for simplicity test_queries = [ "What is machine learning?", "Tell me about the capital of France", "How does Python work?", "What is artificial intelligence?" ] for query in test_queries: print(f"\n Query: '{query}'") start_time = time.time() results = retriever.retrieve_single(query, k=3) retrieval_time = time.time() - start_time print(f" ✓ Retrieved {len(results)} passages in {retrieval_time:.3f}s") for i, result in enumerate(results): print(f" [{i+1}] Score: {result['score']:.4f}") print(f" Text: {result['text'][:100]}...") # Step 5: Test similarity calculation print("\n5. Testing similarity calculation...") # Test query-passage similarity query = "What is machine learning?" query_embedding = embedder.encode_queries([query])[0] print(f" Query: '{query}'") print(f" Query embedding shape: {query_embedding.shape}") # Calculate similarities with all passages similarities = [] for i, passage_embedding in enumerate(embeddings): # Cosine similarity similarity = np.dot(query_embedding, passage_embedding) / ( np.linalg.norm(query_embedding) * np.linalg.norm(passage_embedding) ) similarities.append((i, similarity, passage_texts[i])) # Sort by similarity similarities.sort(key=lambda x: x[1], reverse=True) print(f" ✓ Calculated similarities with {len(similarities)} passages") print(f" Top 3 most similar passages:") for i, (idx, sim, text) in enumerate(similarities[:3]): print(f" [{i+1}] Similarity: {sim:.4f}") print(f" Text: {text[:80]}...") # Step 6: Test generation print("\n6. Testing generation...") from generator import SafeGenerator, PromptTemplates templates = PromptTemplates() generator = SafeGenerator(None, None, 0.3, 0.7) # Simplified version test_query = "What is machine learning?" retrieved_passages = retriever.retrieve_single(test_query, k=3) print(f" Query: '{test_query}'") print(f" Retrieved {len(retrieved_passages)} passages") # Generate answer start_time = time.time() result = generator.generate_with_strategy(test_query, retrieved_passages) generation_time = time.time() - start_time print(f" ✓ Generated answer in {generation_time:.3f}s") print(f" Answer: {result['answer'][:200]}...") print(f" Risk Score: {result['risk_score']:.3f}") print(f" Strategy: {result['strategy']}") print("\n" + "=" * 50) print("🎉 Real embedding pipeline test completed successfully!") print("\nPipeline Summary:") print(f"- Data Loading: {len(kb_passages)} passages") print(f"- Real Embedding Generation: {embeddings.shape[0]} vectors ({embeddings.shape[1]}D)") print(f"- Index Building: {index.index.ntotal} indexed vectors") print(f"- Retrieval: {len(test_queries)} test queries") print(f"- Similarity Calculation: Cosine similarity with all passages") print(f"- Generation: Risk-aware answer generation") return True except Exception as e: print(f"\n❌ Pipeline test failed: {e}") import traceback traceback.print_exc() return False def test_embedding_quality(): """Test embedding quality and properties""" print("\n" + "=" * 50) print("Testing Embedding Quality") print("=" * 50) try: from retriever import Embedder # Initialize embedder embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu") # Test texts test_texts = [ "Machine learning is a subset of artificial intelligence", "The capital of France is Paris", "Python is a programming language", "Machine learning algorithms learn from data", # Similar to first "Paris is the capital city of France", # Similar to second ] print("1. Generating embeddings for test texts...") embeddings = embedder.encode(test_texts) print(f" ✓ Generated {embeddings.shape[0]} embeddings") print("\n2. Testing similarity between related texts...") # Test similarity between related texts pairs = [ (0, 3, "Machine learning texts"), (1, 4, "France/Paris texts"), ] for i, j, description in pairs: sim = np.dot(embeddings[i], embeddings[j]) / ( np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j]) ) print(f" {description}: {sim:.4f}") print(f" Text 1: {test_texts[i]}") print(f" Text 2: {test_texts[j]}") print("\n3. Testing embedding properties...") # Check if embeddings are normalized norms = [np.linalg.norm(emb) for emb in embeddings] print(f" ✓ Embedding norms: {[f'{n:.4f}' for n in norms]}") # Check embedding statistics all_embeddings = embeddings.flatten() print(f" ✓ All embedding values:") print(f" - Mean: {np.mean(all_embeddings):.4f}") print(f" - Std: {np.std(all_embeddings):.4f}") print(f" - Min: {np.min(all_embeddings):.4f}") print(f" - Max: {np.max(all_embeddings):.4f}") print("\n✅ Embedding quality test completed!") return True except Exception as e: print(f"\n❌ Embedding quality test failed: {e}") import traceback traceback.print_exc() return False def main(): """Run all tests""" print("SafeRAG Real Embedding Test Suite") print("=" * 60) success = True # Test embedding quality if not test_embedding_quality(): success = False # Test real embedding pipeline if not test_real_embedding_pipeline(): success = False print("\n" + "=" * 60) if success: print("🎉 All real embedding tests passed!") print("\nThe system can now:") print("1. ✅ Load data from knowledge base") print("2. ✅ Generate real embeddings using sentence-transformers") print("3. ✅ Build FAISS index with real embeddings") print("4. ✅ Retrieve relevant passages using real similarity") print("5. ✅ Calculate cosine similarity between queries and passages") print("6. ✅ Generate answers based on retrieved passages") print("7. ✅ Assess embedding quality and properties") else: print("❌ Some tests failed. Please check the errors above.") return success if __name__ == "__main__": success = main() sys.exit(0 if success else 1)