File size: 15,564 Bytes
c19c7bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
"""
Automated RAG pipeline validation script.
Tests end-to-end functionality, multi-tenant isolation, and anti-hallucination.
"""
import httpx
import time
import json
from pathlib import Path
from typing import Dict, List, Any, Tuple
import sys

# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))

BASE_URL = "http://localhost:8000"
TEST_TENANT_A = "tenant_A"
TEST_TENANT_B = "tenant_B"
TEST_USER_A = "user_A"
TEST_USER_B = "user_B"
TEST_KB_A = "kb_A"
TEST_KB_B = "kb_B"

# Test documents
TENANT_A_DOC = Path(__file__).parent.parent / "data" / "test_docs" / "tenant_A_kb.md"
TENANT_B_DOC = Path(__file__).parent.parent / "data" / "test_docs" / "tenant_B_kb.md"

# Test results storage
test_results: List[Dict[str, Any]] = []


def print_header(text: str):
    """Print a formatted header."""
    print("\n" + "=" * 80)
    print(f"  {text}")
    print("=" * 80)


def print_test(test_name: str, passed: bool, reason: str = ""):
    """Print test result."""
    status = "[PASS]" if passed else "[FAIL]"
    print(f"{status} | {test_name}")
    if reason:
        print(f"      └─ {reason}")
    test_results.append({
        "test": test_name,
        "passed": passed,
        "reason": reason
    })


def wait_for_server(max_retries: int = 10, delay: int = 2) -> bool:
    """Wait for the server to be ready."""
    print("Waiting for server to be ready...")
    for i in range(max_retries):
        try:
            response = httpx.get(f"{BASE_URL}/health", timeout=5)
            if response.status_code == 200:
                print("[OK] Server is ready")
                return True
        except Exception:
            pass
        time.sleep(delay)
        print(f"  Retry {i+1}/{max_retries}...")
    print("[FAIL] Server not ready after max retries")
    return False


def upload_document(
    client: httpx.Client,
    file_path: Path,
    tenant_id: str,
    user_id: str,
    kb_id: str
) -> Dict[str, Any]:
    """Upload a document to the knowledge base."""
    try:
        with open(file_path, "rb") as f:
            files = {"file": (file_path.name, f, "text/markdown")}
            data = {
                "tenant_id": tenant_id,
                "user_id": user_id,
                "kb_id": kb_id
            }
            response = client.post(
                f"{BASE_URL}/kb/upload",
                files=files,
                data=data,
                timeout=60
            )
            if response.status_code == 200:
                return {"success": True, "data": response.json()}
            else:
                return {"success": False, "error": response.text}
    except Exception as e:
        return {"success": False, "error": str(e)}


def test_retrieval(
    client: httpx.Client,
    query: str,
    tenant_id: str,
    user_id: str,
    kb_id: str,
    expected_keywords: List[str],
    should_not_contain: List[str] = None,
    top_k: int = 5
) -> Tuple[bool, str]:
    """Test retrieval accuracy."""
    try:
        # Use GET for search endpoint with headers for dev mode auth
        headers = {
            "X-Tenant-Id": tenant_id,
            "X-User-Id": user_id
        }
        response = client.get(
            f"{BASE_URL}/kb/search",
            params={
                "query": query,
                "kb_id": kb_id,
                "top_k": top_k
            },
            headers=headers,
            timeout=30
        )
        
        if response.status_code != 200:
            return False, f"API returned {response.status_code}: {response.text}"
        
        data = response.json()
        results = data.get("results", [])
        
        if not results:
            return False, "No results retrieved"
        
        # Check tenant isolation
        for result in results:
            metadata = result.get("metadata", {})
            result_tenant = metadata.get("tenant_id")
            if result_tenant != tenant_id:
                return False, f"Tenant leak detected! Got tenant_id={result_tenant}, expected {tenant_id}"
        
        # Check for expected keywords
        all_content = " ".join([r.get("content", "") for r in results]).lower()
        found_keywords = [kw for kw in expected_keywords if kw.lower() in all_content]
        
        if not found_keywords:
            return False, f"Expected keywords not found: {expected_keywords}"
        
        # Check for forbidden content
        if should_not_contain:
            for forbidden in should_not_contain:
                if forbidden.lower() in all_content:
                    return False, f"Forbidden content found: {forbidden}"
        
        return True, f"Retrieved {len(results)} results, found keywords: {found_keywords}"
        
    except Exception as e:
        return False, f"Error: {str(e)}"


def test_chat(
    client: httpx.Client,
    question: str,
    tenant_id: str,
    user_id: str,
    kb_id: str,
    expected_keywords: List[str] = None,
    should_refuse: bool = False,
    should_not_contain: List[str] = None
) -> Tuple[bool, str, Dict[str, Any]]:
    """Test full chat endpoint."""
    try:
        # Include headers for dev mode auth
        headers = {
            "X-Tenant-Id": tenant_id,
            "X-User-Id": user_id
        }
        response = client.post(
            f"{BASE_URL}/chat",
            json={
                "tenant_id": tenant_id,
                "user_id": user_id,
                "kb_id": kb_id,
                "question": question
            },
            headers=headers,
            timeout=60
        )
        
        if response.status_code != 200:
            return False, f"API returned {response.status_code}: {response.text}", {}
        
        data = response.json()
        answer = data.get("answer", "").lower()
        citations = data.get("citations", [])
        from_kb = data.get("from_knowledge_base", False)
        confidence = data.get("confidence", 0.0)
        metadata = data.get("metadata", {})
        refused = metadata.get("refused", False)
        
        # Check refusal behavior (STRICT)
        if should_refuse:
            # Check if response explicitly indicates refusal
            refused = data.get("refused", False)
            refusal_keywords = [
                "couldn't find", "don't have", "not available", "contact support",
                "not in the knowledge base", "could not verify", "not enough information",
                "apologize", "couldn't find relevant information"
            ]
            has_refusal_keywords = any(kw in answer for kw in refusal_keywords)
            
            # If answer was generated with citations, it's a FAIL (should have refused)
            if citations and len(citations) > 0:
                return False, (
                    f"Should have refused but generated answer with {len(citations)} citations. "
                    f"Answer: {answer[:300]}"
                ), data
            
            # If confidence is high and answer exists, it's a FAIL
            if confidence >= 0.30 and answer and not has_refusal_keywords:
                return False, (
                    f"Should have refused but generated answer with confidence {confidence:.2f}. "
                    f"Answer: {answer[:300]}"
                ), data
            
            # If not refused and no refusal keywords, it's a FAIL
            if not refused and not has_refusal_keywords:
                return False, (
                    f"Should have refused but didn't. "
                    f"refused={refused}, confidence={confidence:.2f}, citations={len(citations)}. "
                    f"Answer: {answer[:300]}"
                ), data
            
            # If we got here, it properly refused
            return True, f"Properly refused (refused={refused}, confidence={confidence:.2f})", data
        
        # Check for expected keywords
        if expected_keywords:
            found = [kw for kw in expected_keywords if kw.lower() in answer]
            if not found:
                return False, f"Expected keywords not found: {expected_keywords}. Answer: {answer[:200]}", data
        
        # Check citations
        if not should_refuse and from_kb:
            if not citations:
                return False, "Answer claims to be from KB but has no citations", data
        
        # Check for forbidden content
        if should_not_contain:
            for forbidden in should_not_contain:
                if forbidden.lower() in answer:
                    return False, f"Forbidden content found in answer: {forbidden}", data
        
        # Check citation integrity
        if citations and expected_keywords:
            citation_text = " ".join([c.get("excerpt", "") for c in citations]).lower()
            for kw in expected_keywords:
                if kw.lower() in answer and kw.lower() not in citation_text:
                    # This is a warning, not a failure
                    pass
        
        return True, f"Answer generated (confidence: {confidence:.2f}, citations: {len(citations)})", data
        
    except Exception as e:
        return False, f"Error: {str(e)}", {}


def main():
    """Run all validation tests."""
    print_header("RAG Pipeline Validation Suite")
    
    # Check server
    if not wait_for_server():
        print("[FAIL] Cannot proceed without server")
        return
    
    client = httpx.Client(timeout=120.0)
    
    # ========== PHASE 1: Upload Documents ==========
    print_header("Phase 1: Upload Test Documents")
    
    # Upload tenant A doc
    print(f"\n📤 Uploading {TENANT_A_DOC.name} for {TEST_TENANT_A}...")
    result = upload_document(client, TENANT_A_DOC, TEST_TENANT_A, TEST_USER_A, TEST_KB_A)
    if result["success"]:
        print("[OK] Upload successful")
        print("⏳ Waiting for document processing (10 seconds)...")
        time.sleep(10)  # Wait longer for processing (parsing, chunking, embedding)
    else:
        print(f"[FAIL] Upload failed: {result.get('error')}")
        return
    
    # Upload tenant B doc
    print(f"\n📤 Uploading {TENANT_B_DOC.name} for {TEST_TENANT_B}...")
    result = upload_document(client, TENANT_B_DOC, TEST_TENANT_B, TEST_USER_B, TEST_KB_B)
    if result["success"]:
        print("[OK] Upload successful")
        print("⏳ Waiting for document processing (10 seconds)...")
        time.sleep(10)  # Wait longer for processing (parsing, chunking, embedding)
    else:
        print(f"[FAIL] Upload failed: {result.get('error')}")
        return
    
    # ========== PHASE 2: Retrieval Tests ==========
    print_header("Phase 2: Retrieval Accuracy Tests")
    
    # Test 1: Tenant A retrieval
    passed, reason = test_retrieval(
        client,
        "What is the refund window?",
        TEST_TENANT_A,
        TEST_USER_A,
        TEST_KB_A,
        expected_keywords=["7 days"],
        should_not_contain=["30 days"]
    )
    print_test("Retrieval: Tenant A - Refund Window", passed, reason)
    
    # Test 2: Tenant B retrieval
    passed, reason = test_retrieval(
        client,
        "What is the refund window?",
        TEST_TENANT_B,
        TEST_USER_B,
        TEST_KB_B,
        expected_keywords=["30 days"],
        should_not_contain=["7 days"]
    )
    print_test("Retrieval: Tenant B - Refund Window", passed, reason)
    
    # Test 3: Tenant isolation (A should not get B's data)
    passed, reason = test_retrieval(
        client,
        "Starter plan price",
        TEST_TENANT_A,
        TEST_USER_A,
        TEST_KB_A,
        expected_keywords=["499"],
        should_not_contain=["999"]
    )
    print_test("Retrieval: Tenant A - Starter Plan Price (Isolation)", passed, reason)
    
    # Test 4: Tenant isolation (B should not get A's data)
    passed, reason = test_retrieval(
        client,
        "Starter plan price",
        TEST_TENANT_B,
        TEST_USER_B,
        TEST_KB_B,
        expected_keywords=["999"],
        should_not_contain=["499"]
    )
    print_test("Retrieval: Tenant B - Starter Plan Price (Isolation)", passed, reason)
    
    # ========== PHASE 3: Chat Tests ==========
    print_header("Phase 3: Chat Endpoint Tests")
    
    # Test 5: Tenant A chat - refund window
    passed, reason, data = test_chat(
        client,
        "What is the refund window?",
        TEST_TENANT_A,
        TEST_USER_A,
        TEST_KB_A,
        expected_keywords=["7 days"],
        should_not_contain=["30 days"]
    )
    print_test("Chat: Tenant A - Refund Window", passed, reason)
    
    # Test 6: Tenant B chat - refund window
    passed, reason, data = test_chat(
        client,
        "What is the refund window?",
        TEST_TENANT_B,
        TEST_USER_B,
        TEST_KB_B,
        expected_keywords=["30 days"],
        should_not_contain=["7 days"]
    )
    print_test("Chat: Tenant B - Refund Window", passed, reason)
    
    # Test 7: Tenant A chat - Starter plan
    passed, reason, data = test_chat(
        client,
        "What is the Starter plan price?",
        TEST_TENANT_A,
        TEST_USER_A,
        TEST_KB_A,
        expected_keywords=["499"],
        should_not_contain=["999"]
    )
    print_test("Chat: Tenant A - Starter Plan Price", passed, reason)
    
    # Test 8: Tenant B chat - Starter plan
    passed, reason, data = test_chat(
        client,
        "What is the Starter plan price?",
        TEST_TENANT_B,
        TEST_USER_B,
        TEST_KB_B,
        expected_keywords=["999"],
        should_not_contain=["499"]
    )
    print_test("Chat: Tenant B - Starter Plan Price", passed, reason)
    
    # Test 9: Hallucination refusal - out of scope
    passed, reason, data = test_chat(
        client,
        "How to integrate ClientSphere with Shopify?",
        TEST_TENANT_A,
        TEST_USER_A,
        TEST_KB_A,
        should_refuse=True
    )
    print_test("Chat: Hallucination Refusal (Out of Scope)", passed, reason)
    
    # Test 10: Citation integrity
    passed, reason, data = test_chat(
        client,
        "How long do password reset links last?",
        TEST_TENANT_A,
        TEST_USER_A,
        TEST_KB_A,
        expected_keywords=["15"]
    )
    if passed:
        citations = data.get("citations", [])
        if citations:
            print_test("Chat: Citation Integrity", True, f"Found {len(citations)} citations")
        else:
            print_test("Chat: Citation Integrity", False, "No citations provided")
    else:
        print_test("Chat: Citation Integrity", False, reason)
    
    # ========== PHASE 4: Summary ==========
    print_header("Test Summary")
    
    total_tests = len(test_results)
    passed_tests = sum(1 for r in test_results if r["passed"])
    failed_tests = total_tests - passed_tests
    
    print(f"\nTotal Tests: {total_tests}")
    print(f"[PASS] Passed: {passed_tests}")
    print(f"[FAIL] Failed: {failed_tests}")
    print(f"Success Rate: {(passed_tests/total_tests*100):.1f}%")
    
    if failed_tests > 0:
        print("\n[FAIL] Failed Tests:")
        for result in test_results:
            if not result["passed"]:
                print(f"  - {result['test']}: {result['reason']}")
    
    # Final verdict
    print_header("Final Verdict")
    if failed_tests == 0:
        print("[PASS] ALL TESTS PASSED - RAG Pipeline is working correctly")
        return 0
    else:
        print(f"[FAIL] {failed_tests} TEST(S) FAILED - Review issues above")
        return 1


if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)