Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| SafeRAG Real Embedding Test | |
| Load data -> Generate real embeddings using sentence-transformers -> Build index -> Retrieve | |
| """ | |
| import sys | |
| import os | |
| import time | |
| import numpy as np | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| def test_real_embedding_pipeline(): | |
| """Test the complete pipeline with real embeddings""" | |
| print("SafeRAG Real Embedding Pipeline Test") | |
| print("=" * 50) | |
| try: | |
| # Step 1: Load data | |
| print("\n1. Loading data...") | |
| from data_processing import DataLoader, Preprocessor | |
| loader = DataLoader() | |
| preprocessor = Preprocessor() | |
| # Load knowledge base | |
| kb_passages = loader.get_knowledge_base() | |
| print(f" β Loaded {len(kb_passages)} knowledge base passages") | |
| # Show sample passages | |
| for i, passage in enumerate(kb_passages): | |
| print(f" [{i+1}] {passage}") | |
| # Preprocess passages | |
| processed_passages = preprocessor.preprocess_passages(kb_passages) | |
| print(f" β Preprocessed {len(processed_passages)} passages") | |
| # Step 2: Generate real embeddings | |
| print("\n2. Generating real embeddings with sentence-transformers...") | |
| from retriever import Embedder | |
| # Use a smaller model for faster testing | |
| embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu") | |
| print(f" β Loaded embedding model: {embedder.model_name}") | |
| print(f" β Embedding dimension: {embedder.get_dimension()}") | |
| # Extract text from processed passages | |
| passage_texts = [p['text'] for p in processed_passages] | |
| # Generate embeddings | |
| start_time = time.time() | |
| embeddings = embedder.encode_passages(passage_texts) | |
| embedding_time = time.time() - start_time | |
| print(f" β Generated {embeddings.shape[0]} embeddings in {embedding_time:.3f}s") | |
| print(f" β Embedding shape: {embeddings.shape}") | |
| print(f" β Embedding type: {type(embeddings)}") | |
| # Show embedding statistics | |
| print(f" β Embedding stats:") | |
| print(f" - Mean: {np.mean(embeddings):.4f}") | |
| print(f" - Std: {np.std(embeddings):.4f}") | |
| print(f" - Min: {np.min(embeddings):.4f}") | |
| print(f" - Max: {np.max(embeddings):.4f}") | |
| # Step 3: Build FAISS index | |
| print("\n3. Building FAISS index...") | |
| from retriever import FAISSIndex | |
| index = FAISSIndex(embedder.get_dimension()) | |
| start_time = time.time() | |
| index.build_index(embeddings, passage_texts) | |
| build_time = time.time() - start_time | |
| print(f" β Built FAISS index in {build_time:.3f}s") | |
| print(f" β Index contains {index.index.ntotal} vectors") | |
| # Step 4: Test retrieval | |
| print("\n4. Testing retrieval...") | |
| from retriever import Retriever | |
| retriever = Retriever(embedder, index, None) # No reranker for simplicity | |
| test_queries = [ | |
| "What is machine learning?", | |
| "Tell me about the capital of France", | |
| "How does Python work?", | |
| "What is artificial intelligence?" | |
| ] | |
| for query in test_queries: | |
| print(f"\n Query: '{query}'") | |
| start_time = time.time() | |
| results = retriever.retrieve_single(query, k=3) | |
| retrieval_time = time.time() - start_time | |
| print(f" β Retrieved {len(results)} passages in {retrieval_time:.3f}s") | |
| for i, result in enumerate(results): | |
| print(f" [{i+1}] Score: {result['score']:.4f}") | |
| print(f" Text: {result['text'][:100]}...") | |
| # Step 5: Test similarity calculation | |
| print("\n5. Testing similarity calculation...") | |
| # Test query-passage similarity | |
| query = "What is machine learning?" | |
| query_embedding = embedder.encode_queries([query])[0] | |
| print(f" Query: '{query}'") | |
| print(f" Query embedding shape: {query_embedding.shape}") | |
| # Calculate similarities with all passages | |
| similarities = [] | |
| for i, passage_embedding in enumerate(embeddings): | |
| # Cosine similarity | |
| similarity = np.dot(query_embedding, passage_embedding) / ( | |
| np.linalg.norm(query_embedding) * np.linalg.norm(passage_embedding) | |
| ) | |
| similarities.append((i, similarity, passage_texts[i])) | |
| # Sort by similarity | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| print(f" β Calculated similarities with {len(similarities)} passages") | |
| print(f" Top 3 most similar passages:") | |
| for i, (idx, sim, text) in enumerate(similarities[:3]): | |
| print(f" [{i+1}] Similarity: {sim:.4f}") | |
| print(f" Text: {text[:80]}...") | |
| # Step 6: Test generation | |
| print("\n6. Testing generation...") | |
| from generator import SafeGenerator, PromptTemplates | |
| templates = PromptTemplates() | |
| generator = SafeGenerator(None, None, 0.3, 0.7) # Simplified version | |
| test_query = "What is machine learning?" | |
| retrieved_passages = retriever.retrieve_single(test_query, k=3) | |
| print(f" Query: '{test_query}'") | |
| print(f" Retrieved {len(retrieved_passages)} passages") | |
| # Generate answer | |
| start_time = time.time() | |
| result = generator.generate_with_strategy(test_query, retrieved_passages) | |
| generation_time = time.time() - start_time | |
| print(f" β Generated answer in {generation_time:.3f}s") | |
| print(f" Answer: {result['answer'][:200]}...") | |
| print(f" Risk Score: {result['risk_score']:.3f}") | |
| print(f" Strategy: {result['strategy']}") | |
| print("\n" + "=" * 50) | |
| print("π Real embedding pipeline test completed successfully!") | |
| print("\nPipeline Summary:") | |
| print(f"- Data Loading: {len(kb_passages)} passages") | |
| print(f"- Real Embedding Generation: {embeddings.shape[0]} vectors ({embeddings.shape[1]}D)") | |
| print(f"- Index Building: {index.index.ntotal} indexed vectors") | |
| print(f"- Retrieval: {len(test_queries)} test queries") | |
| print(f"- Similarity Calculation: Cosine similarity with all passages") | |
| print(f"- Generation: Risk-aware answer generation") | |
| return True | |
| except Exception as e: | |
| print(f"\nβ Pipeline test failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def test_embedding_quality(): | |
| """Test embedding quality and properties""" | |
| print("\n" + "=" * 50) | |
| print("Testing Embedding Quality") | |
| print("=" * 50) | |
| try: | |
| from retriever import Embedder | |
| # Initialize embedder | |
| embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu") | |
| # Test texts | |
| test_texts = [ | |
| "Machine learning is a subset of artificial intelligence", | |
| "The capital of France is Paris", | |
| "Python is a programming language", | |
| "Machine learning algorithms learn from data", # Similar to first | |
| "Paris is the capital city of France", # Similar to second | |
| ] | |
| print("1. Generating embeddings for test texts...") | |
| embeddings = embedder.encode(test_texts) | |
| print(f" β Generated {embeddings.shape[0]} embeddings") | |
| print("\n2. Testing similarity between related texts...") | |
| # Test similarity between related texts | |
| pairs = [ | |
| (0, 3, "Machine learning texts"), | |
| (1, 4, "France/Paris texts"), | |
| ] | |
| for i, j, description in pairs: | |
| sim = np.dot(embeddings[i], embeddings[j]) / ( | |
| np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j]) | |
| ) | |
| print(f" {description}: {sim:.4f}") | |
| print(f" Text 1: {test_texts[i]}") | |
| print(f" Text 2: {test_texts[j]}") | |
| print("\n3. Testing embedding properties...") | |
| # Check if embeddings are normalized | |
| norms = [np.linalg.norm(emb) for emb in embeddings] | |
| print(f" β Embedding norms: {[f'{n:.4f}' for n in norms]}") | |
| # Check embedding statistics | |
| all_embeddings = embeddings.flatten() | |
| print(f" β All embedding values:") | |
| print(f" - Mean: {np.mean(all_embeddings):.4f}") | |
| print(f" - Std: {np.std(all_embeddings):.4f}") | |
| print(f" - Min: {np.min(all_embeddings):.4f}") | |
| print(f" - Max: {np.max(all_embeddings):.4f}") | |
| print("\nβ Embedding quality test completed!") | |
| return True | |
| except Exception as e: | |
| print(f"\nβ Embedding quality test failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def main(): | |
| """Run all tests""" | |
| print("SafeRAG Real Embedding Test Suite") | |
| print("=" * 60) | |
| success = True | |
| # Test embedding quality | |
| if not test_embedding_quality(): | |
| success = False | |
| # Test real embedding pipeline | |
| if not test_real_embedding_pipeline(): | |
| success = False | |
| print("\n" + "=" * 60) | |
| if success: | |
| print("π All real embedding tests passed!") | |
| print("\nThe system can now:") | |
| print("1. β Load data from knowledge base") | |
| print("2. β Generate real embeddings using sentence-transformers") | |
| print("3. β Build FAISS index with real embeddings") | |
| print("4. β Retrieve relevant passages using real similarity") | |
| print("5. β Calculate cosine similarity between queries and passages") | |
| print("6. β Generate answers based on retrieved passages") | |
| print("7. β Assess embedding quality and properties") | |
| else: | |
| print("β Some tests failed. Please check the errors above.") | |
| return success | |
| if __name__ == "__main__": | |
| success = main() | |
| sys.exit(0 if success else 1) | |