File size: 10,347 Bytes
8a3396b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SafeRAG Real Embedding Test
Load data -> Generate real embeddings using sentence-transformers -> Build index -> Retrieve
"""

import sys
import os
import time
import numpy as np
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

def test_real_embedding_pipeline():
    """Test the complete pipeline with real embeddings"""
    print("SafeRAG Real Embedding Pipeline Test")
    print("=" * 50)
    
    try:
        # Step 1: Load data
        print("\n1. Loading data...")
        from data_processing import DataLoader, Preprocessor
        
        loader = DataLoader()
        preprocessor = Preprocessor()
        
        # Load knowledge base
        kb_passages = loader.get_knowledge_base()
        print(f"   βœ“ Loaded {len(kb_passages)} knowledge base passages")
        
        # Show sample passages
        for i, passage in enumerate(kb_passages):
            print(f"     [{i+1}] {passage}")
        
        # Preprocess passages
        processed_passages = preprocessor.preprocess_passages(kb_passages)
        print(f"   βœ“ Preprocessed {len(processed_passages)} passages")
        
        # Step 2: Generate real embeddings
        print("\n2. Generating real embeddings with sentence-transformers...")
        from retriever import Embedder
        
        # Use a smaller model for faster testing
        embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu")
        print(f"   βœ“ Loaded embedding model: {embedder.model_name}")
        print(f"   βœ“ Embedding dimension: {embedder.get_dimension()}")
        
        # Extract text from processed passages
        passage_texts = [p['text'] for p in processed_passages]
        
        # Generate embeddings
        start_time = time.time()
        embeddings = embedder.encode_passages(passage_texts)
        embedding_time = time.time() - start_time
        
        print(f"   βœ“ Generated {embeddings.shape[0]} embeddings in {embedding_time:.3f}s")
        print(f"   βœ“ Embedding shape: {embeddings.shape}")
        print(f"   βœ“ Embedding type: {type(embeddings)}")
        
        # Show embedding statistics
        print(f"   βœ“ Embedding stats:")
        print(f"     - Mean: {np.mean(embeddings):.4f}")
        print(f"     - Std: {np.std(embeddings):.4f}")
        print(f"     - Min: {np.min(embeddings):.4f}")
        print(f"     - Max: {np.max(embeddings):.4f}")
        
        # Step 3: Build FAISS index
        print("\n3. Building FAISS index...")
        from retriever import FAISSIndex
        
        index = FAISSIndex(embedder.get_dimension())
        start_time = time.time()
        index.build_index(embeddings, passage_texts)
        build_time = time.time() - start_time
        
        print(f"   βœ“ Built FAISS index in {build_time:.3f}s")
        print(f"   βœ“ Index contains {index.index.ntotal} vectors")
        
        # Step 4: Test retrieval
        print("\n4. Testing retrieval...")
        from retriever import Retriever
        
        retriever = Retriever(embedder, index, None)  # No reranker for simplicity
        
        test_queries = [
            "What is machine learning?",
            "Tell me about the capital of France",
            "How does Python work?",
            "What is artificial intelligence?"
        ]
        
        for query in test_queries:
            print(f"\n   Query: '{query}'")
            start_time = time.time()
            results = retriever.retrieve_single(query, k=3)
            retrieval_time = time.time() - start_time
            
            print(f"   βœ“ Retrieved {len(results)} passages in {retrieval_time:.3f}s")
            for i, result in enumerate(results):
                print(f"     [{i+1}] Score: {result['score']:.4f}")
                print(f"         Text: {result['text'][:100]}...")
        
        # Step 5: Test similarity calculation
        print("\n5. Testing similarity calculation...")
        
        # Test query-passage similarity
        query = "What is machine learning?"
        query_embedding = embedder.encode_queries([query])[0]
        
        print(f"   Query: '{query}'")
        print(f"   Query embedding shape: {query_embedding.shape}")
        
        # Calculate similarities with all passages
        similarities = []
        for i, passage_embedding in enumerate(embeddings):
            # Cosine similarity
            similarity = np.dot(query_embedding, passage_embedding) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(passage_embedding)
            )
            similarities.append((i, similarity, passage_texts[i]))
        
        # Sort by similarity
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        print(f"   βœ“ Calculated similarities with {len(similarities)} passages")
        print(f"   Top 3 most similar passages:")
        for i, (idx, sim, text) in enumerate(similarities[:3]):
            print(f"     [{i+1}] Similarity: {sim:.4f}")
            print(f"         Text: {text[:80]}...")
        
        # Step 6: Test generation
        print("\n6. Testing generation...")
        from generator import SafeGenerator, PromptTemplates
        
        templates = PromptTemplates()
        generator = SafeGenerator(None, None, 0.3, 0.7)  # Simplified version
        
        test_query = "What is machine learning?"
        retrieved_passages = retriever.retrieve_single(test_query, k=3)
        
        print(f"   Query: '{test_query}'")
        print(f"   Retrieved {len(retrieved_passages)} passages")
        
        # Generate answer
        start_time = time.time()
        result = generator.generate_with_strategy(test_query, retrieved_passages)
        generation_time = time.time() - start_time
        
        print(f"   βœ“ Generated answer in {generation_time:.3f}s")
        print(f"   Answer: {result['answer'][:200]}...")
        print(f"   Risk Score: {result['risk_score']:.3f}")
        print(f"   Strategy: {result['strategy']}")
        
        print("\n" + "=" * 50)
        print("πŸŽ‰ Real embedding pipeline test completed successfully!")
        print("\nPipeline Summary:")
        print(f"- Data Loading: {len(kb_passages)} passages")
        print(f"- Real Embedding Generation: {embeddings.shape[0]} vectors ({embeddings.shape[1]}D)")
        print(f"- Index Building: {index.index.ntotal} indexed vectors")
        print(f"- Retrieval: {len(test_queries)} test queries")
        print(f"- Similarity Calculation: Cosine similarity with all passages")
        print(f"- Generation: Risk-aware answer generation")
        
        return True
        
    except Exception as e:
        print(f"\n❌ Pipeline test failed: {e}")
        import traceback
        traceback.print_exc()
        return False

def test_embedding_quality():
    """Test embedding quality and properties"""
    print("\n" + "=" * 50)
    print("Testing Embedding Quality")
    print("=" * 50)
    
    try:
        from retriever import Embedder
        
        # Initialize embedder
        embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu")
        
        # Test texts
        test_texts = [
            "Machine learning is a subset of artificial intelligence",
            "The capital of France is Paris",
            "Python is a programming language",
            "Machine learning algorithms learn from data",  # Similar to first
            "Paris is the capital city of France",  # Similar to second
        ]
        
        print("1. Generating embeddings for test texts...")
        embeddings = embedder.encode(test_texts)
        print(f"   βœ“ Generated {embeddings.shape[0]} embeddings")
        
        print("\n2. Testing similarity between related texts...")
        
        # Test similarity between related texts
        pairs = [
            (0, 3, "Machine learning texts"),
            (1, 4, "France/Paris texts"),
        ]
        
        for i, j, description in pairs:
            sim = np.dot(embeddings[i], embeddings[j]) / (
                np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j])
            )
            print(f"   {description}: {sim:.4f}")
            print(f"     Text 1: {test_texts[i]}")
            print(f"     Text 2: {test_texts[j]}")
        
        print("\n3. Testing embedding properties...")
        
        # Check if embeddings are normalized
        norms = [np.linalg.norm(emb) for emb in embeddings]
        print(f"   βœ“ Embedding norms: {[f'{n:.4f}' for n in norms]}")
        
        # Check embedding statistics
        all_embeddings = embeddings.flatten()
        print(f"   βœ“ All embedding values:")
        print(f"     - Mean: {np.mean(all_embeddings):.4f}")
        print(f"     - Std: {np.std(all_embeddings):.4f}")
        print(f"     - Min: {np.min(all_embeddings):.4f}")
        print(f"     - Max: {np.max(all_embeddings):.4f}")
        
        print("\nβœ… Embedding quality test completed!")
        return True
        
    except Exception as e:
        print(f"\n❌ Embedding quality test failed: {e}")
        import traceback
        traceback.print_exc()
        return False

def main():
    """Run all tests"""
    print("SafeRAG Real Embedding Test Suite")
    print("=" * 60)
    
    success = True
    
    # Test embedding quality
    if not test_embedding_quality():
        success = False
    
    # Test real embedding pipeline
    if not test_real_embedding_pipeline():
        success = False
    
    print("\n" + "=" * 60)
    if success:
        print("πŸŽ‰ All real embedding tests passed!")
        print("\nThe system can now:")
        print("1. βœ… Load data from knowledge base")
        print("2. βœ… Generate real embeddings using sentence-transformers")
        print("3. βœ… Build FAISS index with real embeddings")
        print("4. βœ… Retrieve relevant passages using real similarity")
        print("5. βœ… Calculate cosine similarity between queries and passages")
        print("6. βœ… Generate answers based on retrieved passages")
        print("7. βœ… Assess embedding quality and properties")
    else:
        print("❌ Some tests failed. Please check the errors above.")
    
    return success

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)