File size: 10,539 Bytes
f9c215a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
"""
quick_test.py - Validation Script for RAG Pipeline

This script tests the core functionality of the RAG pipeline:
1. Tests utility functions (chunking, confidence scoring)
2. Tests the embedding model loading
3. Tests the RAG pipeline with a sample query (if data is available)

Run this after ingestion to verify everything works:
    python quick_test.py

This script is designed to work even without API keys by using local mode.
"""

import os
import sys
import json

# Add app directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from app.utils import (
    clean_text,
    chunk_text,
    count_tokens,
    normalize_score,
    compute_confidence,
    format_chunks_for_llm
)
from app.vectorstore import LocalVectorStore


def test_utilities():
    """Test utility functions."""
    print("\n" + "=" * 60)
    print("TEST 1: Utility Functions")
    print("=" * 60)
    
    # Test token counting
    test_text = "This is a sample sentence for testing token counting functionality."
    token_count = count_tokens(test_text)
    print(f"\nβœ“ Token counting: '{test_text[:30]}...' = {token_count} tokens")
    
    # Test text cleaning
    dirty_text = "  This   has   extra    spaces  \n\n\n\nAnd too many newlines Page 123  "
    clean = clean_text(dirty_text)
    print(f"βœ“ Text cleaning: '{dirty_text[:30]}...' -> '{clean[:30]}...'")
    
    # Test chunking
    long_text = "This is a test paragraph. " * 100  # Create a longer text
    chunks = chunk_text(long_text, page_number=1, chunk_size=100, chunk_overlap=20)
    print(f"βœ“ Chunking: Created {len(chunks)} chunks from {count_tokens(long_text)} tokens")
    
    if chunks:
        print(f"  - First chunk ID: {chunks[0]['id']}")
        print(f"  - First chunk tokens: ~{count_tokens(chunks[0]['text'])}")
    
    # Test score normalization
    test_scores = [-1.0, -0.5, 0.0, 0.5, 1.0]
    normalized = [normalize_score(s) for s in test_scores]
    print(f"\nβœ“ Score normalization:")
    for raw, norm in zip(test_scores, normalized):
        print(f"  {raw:5.2f} -> {norm:.3f}")
    
    # Test confidence computation
    sample_scores = [0.8, 0.6, 0.7, 0.5]
    conf_max = compute_confidence(sample_scores, method="max")
    conf_mean = compute_confidence(sample_scores, method="mean")
    print(f"\nβœ“ Confidence computation (from scores {sample_scores}):")
    print(f"  - Max method: {conf_max}")
    print(f"  - Mean method: {conf_mean}")
    
    print("\nβœ… All utility tests passed!")
    return True


def test_local_vectorstore():
    """Test local vector store functionality."""
    print("\n" + "=" * 60)
    print("TEST 2: Local Vector Store")
    print("=" * 60)
    
    import numpy as np
    
    # Create local vector store
    store = LocalVectorStore(dimension=384)
    
    # Create dummy vectors
    vectors = [
        {
            'id': 'test_chunk_1',
            'embedding': np.random.randn(384).tolist(),
            'page': 1,
            'text': 'Agentic AI refers to artificial intelligence systems that can operate autonomously.',
            'source': 'test.pdf'
        },
        {
            'id': 'test_chunk_2',
            'embedding': np.random.randn(384).tolist(),
            'page': 2,
            'text': 'The risks of agentic systems include uncontrolled behavior and safety concerns.',
            'source': 'test.pdf'
        },
        {
            'id': 'test_chunk_3',
            'embedding': np.random.randn(384).tolist(),
            'page': 3,
            'text': 'Safeguards for agentic AI deployment include human oversight and testing.',
            'source': 'test.pdf'
        }
    ]
    
    # Upsert vectors
    count = store.upsert(vectors)
    print(f"\nβœ“ Upserted {count} vectors to local store")
    
    # Query with random vector
    query_vec = np.random.randn(384).tolist()
    results = store.query_top_k(query_vec, k=2)
    
    print(f"βœ“ Query returned {len(results)} results")
    for r in results:
        print(f"  - {r['id']}: score={r['score']:.4f}")
    
    print("\nβœ… Local vector store test passed!")
    return True


def test_embedding_model():
    """Test embedding model loading."""
    print("\n" + "=" * 60)
    print("TEST 3: Embedding Model")
    print("=" * 60)
    
    try:
        from sentence_transformers import SentenceTransformer
        
        print("\nLoading embedding model (this may take a moment)...")
        model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
        
        dim = model.get_sentence_embedding_dimension()
        print(f"βœ“ Model loaded successfully!")
        print(f"βœ“ Embedding dimension: {dim}")
        
        # Test encoding
        test_sentences = [
            "What is agentic AI?",
            "Describe the risks of autonomous systems."
        ]
        
        embeddings = model.encode(test_sentences)
        print(f"βœ“ Encoded {len(test_sentences)} sentences")
        print(f"  - Shape: {embeddings.shape}")
        
        print("\nβœ… Embedding model test passed!")
        return True
        
    except Exception as e:
        print(f"\n❌ Embedding model test failed: {e}")
        return False


def test_rag_pipeline():
    """Test the full RAG pipeline (if data is available)."""
    print("\n" + "=" * 60)
    print("TEST 4: RAG Pipeline")
    print("=" * 60)
    
    # Check if we have local data
    chunks_file = "./data/chunks.jsonl"
    vectors_file = "./data/vectors.json"
    
    if not os.path.exists(chunks_file) and not os.path.exists(vectors_file):
        print("\n⚠️  No ingested data found.")
        print("   Run 'python app/ingest.py --pdf ./data/Ebook-Agentic-AI.pdf --local-only' first.")
        print("   Skipping RAG pipeline test.\n")
        return True  # Not a failure, just skip
    
    try:
        from app.rag_pipeline import RAGPipeline
        
        print("\nInitializing RAG pipeline in local mode...")
        
        # Use local mode for testing
        pipeline = RAGPipeline(
            local_only=True,
            chunks_file=chunks_file
        )
        
        # Test query
        test_query = "What is agentic AI?"
        print(f"\nTest query: '{test_query}'")
        print("-" * 40)
        
        result = pipeline.query(test_query, top_k=3, use_llm=False)
        
        # Display result
        print("\nπŸ“€ Response:")
        print(json.dumps(result, indent=2, default=str)[:1000] + "...")
        
        # Validate response structure
        assert "final_answer" in result, "Missing 'final_answer' in response"
        assert "retrieved_chunks" in result, "Missing 'retrieved_chunks' in response"
        assert "confidence" in result, "Missing 'confidence' in response"
        
        print(f"\nβœ“ Final answer length: {len(result['final_answer'])} chars")
        print(f"βœ“ Retrieved chunks: {len(result['retrieved_chunks'])}")
        print(f"βœ“ Confidence score: {result['confidence']}")
        
        # Show retrieved chunks summary
        if result['retrieved_chunks']:
            print("\nπŸ“š Retrieved chunks:")
            for i, chunk in enumerate(result['retrieved_chunks'][:3]):
                print(f"  {i+1}. Page {chunk.get('page', '?')}, Score: {chunk.get('score', 0):.4f}")
                print(f"     ID: {chunk.get('id', 'unknown')}")
                print(f"     Text: {chunk.get('text', '')[:80]}...")
        
        print("\nβœ… RAG pipeline test passed!")
        return True
        
    except Exception as e:
        print(f"\n❌ RAG pipeline test failed: {e}")
        import traceback
        traceback.print_exc()
        return False


def test_response_format():
    """Test that responses match the expected format."""
    print("\n" + "=" * 60)
    print("TEST 5: Response Format Validation")
    print("=" * 60)
    
    # Example expected format
    expected_format = {
        "final_answer": "string",
        "retrieved_chunks": [
            {
                "id": "string (format: pdfpage_N_chunk_M)",
                "page": "integer",
                "text": "string",
                "score": "float (0.0-1.0)"
            }
        ],
        "confidence": "float (0.0-1.0)"
    }
    
    print("\nβœ“ Expected response format:")
    print(json.dumps(expected_format, indent=2))
    
    # Validate a mock response
    mock_response = {
        "final_answer": "According to the document, agentic AI is...",
        "retrieved_chunks": [
            {"id": "pdfpage_1_chunk_0", "page": 1, "text": "Sample text...", "score": 0.92}
        ],
        "confidence": 0.92
    }
    
    # Check types
    assert isinstance(mock_response["final_answer"], str), "final_answer must be string"
    assert isinstance(mock_response["retrieved_chunks"], list), "retrieved_chunks must be list"
    assert isinstance(mock_response["confidence"], (int, float)), "confidence must be number"
    assert 0 <= mock_response["confidence"] <= 1, "confidence must be between 0 and 1"
    
    if mock_response["retrieved_chunks"]:
        chunk = mock_response["retrieved_chunks"][0]
        assert "id" in chunk, "chunk must have 'id'"
        assert "page" in chunk, "chunk must have 'page'"
        assert "text" in chunk, "chunk must have 'text'"
        assert "score" in chunk, "chunk must have 'score'"
    
    print("\nβœ… Response format validation passed!")
    return True


def main():
    """Run all tests."""
    print("\n" + "=" * 60)
    print("RAG CHATBOT - QUICK TEST SUITE")
    print("=" * 60)
    
    results = {}
    
    # Run tests
    results["utilities"] = test_utilities()
    results["local_vectorstore"] = test_local_vectorstore()
    results["embedding_model"] = test_embedding_model()
    results["rag_pipeline"] = test_rag_pipeline()
    results["response_format"] = test_response_format()
    
    # Summary
    print("\n" + "=" * 60)
    print("TEST SUMMARY")
    print("=" * 60)
    
    passed = sum(1 for v in results.values() if v)
    total = len(results)
    
    for test_name, passed_test in results.items():
        status = "βœ… PASS" if passed_test else "❌ FAIL"
        print(f"  {test_name}: {status}")
    
    print(f"\nTotal: {passed}/{total} tests passed")
    
    if passed == total:
        print("\nπŸŽ‰ All tests passed! The RAG pipeline is ready to use.")
    else:
        print("\n⚠️  Some tests failed. Please check the errors above.")
    
    return passed == total


if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)