safe_rag / real_embedding_test.py
Tairun Meng
update
8a3396b
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SafeRAG Real Embedding Test
Load data -> Generate real embeddings using sentence-transformers -> Build index -> Retrieve
"""
import sys
import os
import time
import numpy as np
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def test_real_embedding_pipeline():
"""Test the complete pipeline with real embeddings"""
print("SafeRAG Real Embedding Pipeline Test")
print("=" * 50)
try:
# Step 1: Load data
print("\n1. Loading data...")
from data_processing import DataLoader, Preprocessor
loader = DataLoader()
preprocessor = Preprocessor()
# Load knowledge base
kb_passages = loader.get_knowledge_base()
print(f" βœ“ Loaded {len(kb_passages)} knowledge base passages")
# Show sample passages
for i, passage in enumerate(kb_passages):
print(f" [{i+1}] {passage}")
# Preprocess passages
processed_passages = preprocessor.preprocess_passages(kb_passages)
print(f" βœ“ Preprocessed {len(processed_passages)} passages")
# Step 2: Generate real embeddings
print("\n2. Generating real embeddings with sentence-transformers...")
from retriever import Embedder
# Use a smaller model for faster testing
embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu")
print(f" βœ“ Loaded embedding model: {embedder.model_name}")
print(f" βœ“ Embedding dimension: {embedder.get_dimension()}")
# Extract text from processed passages
passage_texts = [p['text'] for p in processed_passages]
# Generate embeddings
start_time = time.time()
embeddings = embedder.encode_passages(passage_texts)
embedding_time = time.time() - start_time
print(f" βœ“ Generated {embeddings.shape[0]} embeddings in {embedding_time:.3f}s")
print(f" βœ“ Embedding shape: {embeddings.shape}")
print(f" βœ“ Embedding type: {type(embeddings)}")
# Show embedding statistics
print(f" βœ“ Embedding stats:")
print(f" - Mean: {np.mean(embeddings):.4f}")
print(f" - Std: {np.std(embeddings):.4f}")
print(f" - Min: {np.min(embeddings):.4f}")
print(f" - Max: {np.max(embeddings):.4f}")
# Step 3: Build FAISS index
print("\n3. Building FAISS index...")
from retriever import FAISSIndex
index = FAISSIndex(embedder.get_dimension())
start_time = time.time()
index.build_index(embeddings, passage_texts)
build_time = time.time() - start_time
print(f" βœ“ Built FAISS index in {build_time:.3f}s")
print(f" βœ“ Index contains {index.index.ntotal} vectors")
# Step 4: Test retrieval
print("\n4. Testing retrieval...")
from retriever import Retriever
retriever = Retriever(embedder, index, None) # No reranker for simplicity
test_queries = [
"What is machine learning?",
"Tell me about the capital of France",
"How does Python work?",
"What is artificial intelligence?"
]
for query in test_queries:
print(f"\n Query: '{query}'")
start_time = time.time()
results = retriever.retrieve_single(query, k=3)
retrieval_time = time.time() - start_time
print(f" βœ“ Retrieved {len(results)} passages in {retrieval_time:.3f}s")
for i, result in enumerate(results):
print(f" [{i+1}] Score: {result['score']:.4f}")
print(f" Text: {result['text'][:100]}...")
# Step 5: Test similarity calculation
print("\n5. Testing similarity calculation...")
# Test query-passage similarity
query = "What is machine learning?"
query_embedding = embedder.encode_queries([query])[0]
print(f" Query: '{query}'")
print(f" Query embedding shape: {query_embedding.shape}")
# Calculate similarities with all passages
similarities = []
for i, passage_embedding in enumerate(embeddings):
# Cosine similarity
similarity = np.dot(query_embedding, passage_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(passage_embedding)
)
similarities.append((i, similarity, passage_texts[i]))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
print(f" βœ“ Calculated similarities with {len(similarities)} passages")
print(f" Top 3 most similar passages:")
for i, (idx, sim, text) in enumerate(similarities[:3]):
print(f" [{i+1}] Similarity: {sim:.4f}")
print(f" Text: {text[:80]}...")
# Step 6: Test generation
print("\n6. Testing generation...")
from generator import SafeGenerator, PromptTemplates
templates = PromptTemplates()
generator = SafeGenerator(None, None, 0.3, 0.7) # Simplified version
test_query = "What is machine learning?"
retrieved_passages = retriever.retrieve_single(test_query, k=3)
print(f" Query: '{test_query}'")
print(f" Retrieved {len(retrieved_passages)} passages")
# Generate answer
start_time = time.time()
result = generator.generate_with_strategy(test_query, retrieved_passages)
generation_time = time.time() - start_time
print(f" βœ“ Generated answer in {generation_time:.3f}s")
print(f" Answer: {result['answer'][:200]}...")
print(f" Risk Score: {result['risk_score']:.3f}")
print(f" Strategy: {result['strategy']}")
print("\n" + "=" * 50)
print("πŸŽ‰ Real embedding pipeline test completed successfully!")
print("\nPipeline Summary:")
print(f"- Data Loading: {len(kb_passages)} passages")
print(f"- Real Embedding Generation: {embeddings.shape[0]} vectors ({embeddings.shape[1]}D)")
print(f"- Index Building: {index.index.ntotal} indexed vectors")
print(f"- Retrieval: {len(test_queries)} test queries")
print(f"- Similarity Calculation: Cosine similarity with all passages")
print(f"- Generation: Risk-aware answer generation")
return True
except Exception as e:
print(f"\n❌ Pipeline test failed: {e}")
import traceback
traceback.print_exc()
return False
def test_embedding_quality():
"""Test embedding quality and properties"""
print("\n" + "=" * 50)
print("Testing Embedding Quality")
print("=" * 50)
try:
from retriever import Embedder
# Initialize embedder
embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu")
# Test texts
test_texts = [
"Machine learning is a subset of artificial intelligence",
"The capital of France is Paris",
"Python is a programming language",
"Machine learning algorithms learn from data", # Similar to first
"Paris is the capital city of France", # Similar to second
]
print("1. Generating embeddings for test texts...")
embeddings = embedder.encode(test_texts)
print(f" βœ“ Generated {embeddings.shape[0]} embeddings")
print("\n2. Testing similarity between related texts...")
# Test similarity between related texts
pairs = [
(0, 3, "Machine learning texts"),
(1, 4, "France/Paris texts"),
]
for i, j, description in pairs:
sim = np.dot(embeddings[i], embeddings[j]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j])
)
print(f" {description}: {sim:.4f}")
print(f" Text 1: {test_texts[i]}")
print(f" Text 2: {test_texts[j]}")
print("\n3. Testing embedding properties...")
# Check if embeddings are normalized
norms = [np.linalg.norm(emb) for emb in embeddings]
print(f" βœ“ Embedding norms: {[f'{n:.4f}' for n in norms]}")
# Check embedding statistics
all_embeddings = embeddings.flatten()
print(f" βœ“ All embedding values:")
print(f" - Mean: {np.mean(all_embeddings):.4f}")
print(f" - Std: {np.std(all_embeddings):.4f}")
print(f" - Min: {np.min(all_embeddings):.4f}")
print(f" - Max: {np.max(all_embeddings):.4f}")
print("\nβœ… Embedding quality test completed!")
return True
except Exception as e:
print(f"\n❌ Embedding quality test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all tests"""
print("SafeRAG Real Embedding Test Suite")
print("=" * 60)
success = True
# Test embedding quality
if not test_embedding_quality():
success = False
# Test real embedding pipeline
if not test_real_embedding_pipeline():
success = False
print("\n" + "=" * 60)
if success:
print("πŸŽ‰ All real embedding tests passed!")
print("\nThe system can now:")
print("1. βœ… Load data from knowledge base")
print("2. βœ… Generate real embeddings using sentence-transformers")
print("3. βœ… Build FAISS index with real embeddings")
print("4. βœ… Retrieve relevant passages using real similarity")
print("5. βœ… Calculate cosine similarity between queries and passages")
print("6. βœ… Generate answers based on retrieved passages")
print("7. βœ… Assess embedding quality and properties")
else:
print("❌ Some tests failed. Please check the errors above.")
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)