File size: 16,980 Bytes
c509b44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78b6d7b
c509b44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78b6d7b
c509b44
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
"""
verify_tenant_isolation.py
Script to verify tenant_id is properly used for data isolation

Usage:
    python verify_tenant_isolation.py

This script tests:
- Admin rules isolation
- Analytics isolation
- RAG document isolation
- Database direct verification
"""

import requests
import json
from pathlib import Path
import sys

# Add backend to path
backend_dir = Path(__file__).parent / "backend"
sys.path.insert(0, str(backend_dir))
root_dir = Path(__file__).parent
sys.path.insert(0, str(root_dir))

BASE_URL = "http://localhost:8000"


def print_section(title):
    """Print a formatted section header"""
    print("\n" + "="*60)
    print(f"  {title}")
    print("="*60)


def verify_admin_rules_isolation():
    """Verify admin rules are isolated by tenant_id"""
    print_section("Testing Admin Rules Isolation")
    
    tenant1 = "verify_tenant1"
    tenant2 = "verify_tenant2"
    
    try:
        # Add rules for different tenants
        print(f"\n1. Adding rule for {tenant1}...")
        response = requests.post(
            f"{BASE_URL}/admin/rules",
            headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
            json={"rule": f"Rule for {tenant1}", "severity": "high"},
            timeout=5
        )
        print(f"   Status: {response.status_code}")
        
        print(f"\n2. Adding rule for {tenant2}...")
        response = requests.post(
            f"{BASE_URL}/admin/rules",
            headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
            json={"rule": f"Rule for {tenant2}", "severity": "low"},
            timeout=5
        )
        print(f"   Status: {response.status_code}")
        
        # Get rules for tenant1
        print(f"\n3. Getting rules for {tenant1}...")
        response = requests.get(
            f"{BASE_URL}/admin/rules",
            headers={"x-tenant-id": tenant1},
            timeout=5
        )
        tenant1_rules = response.json().get("rules", [])
        print(f"   Found {len(tenant1_rules)} rules")
        print(f"   Rules: {tenant1_rules}")
        
        # Get rules for tenant2
        print(f"\n4. Getting rules for {tenant2}...")
        response = requests.get(
            f"{BASE_URL}/admin/rules",
            headers={"x-tenant-id": tenant2},
            timeout=5
        )
        tenant2_rules = response.json().get("rules", [])
        print(f"   Found {len(tenant2_rules)} rules")
        print(f"   Rules: {tenant2_rules}")
        
        # Verify isolation
        print("\n5. Verifying isolation...")
        tenant1_rule_text = f"Rule for {tenant1}"
        tenant2_rule_text = f"Rule for {tenant2}"
        
        tenant1_has_own_rule = tenant1_rule_text in tenant1_rules
        tenant1_has_other_rule = tenant2_rule_text in tenant1_rules
        
        tenant2_has_own_rule = tenant2_rule_text in tenant2_rules
        tenant2_has_other_rule = tenant1_rule_text in tenant2_rules
        
        print(f"   Tenant1 has own rule: {tenant1_has_own_rule} βœ“")
        print(f"   Tenant1 has other's rule: {tenant1_has_other_rule} {'βœ— FAILED!' if tenant1_has_other_rule else 'βœ“ PASSED'}")
        print(f"   Tenant2 has own rule: {tenant2_has_own_rule} βœ“")
        print(f"   Tenant2 has other's rule: {tenant2_has_other_rule} {'βœ— FAILED!' if tenant2_has_other_rule else 'βœ“ PASSED'}")
        
        if not tenant1_has_other_rule and not tenant2_has_other_rule:
            print("\nβœ… Admin Rules Isolation: PASSED")
            return True
        else:
            print("\n❌ Admin Rules Isolation: FAILED")
            return False
            
    except requests.exceptions.ConnectionError:
        print("\n⚠️ Cannot connect to API. Make sure it's running:")
        print("   uvicorn backend.api.main:app --port 8000")
        return None
    except Exception as e:
        print(f"\n❌ Error: {e}")
        import traceback
        traceback.print_exc()
        return False


def verify_analytics_isolation():
    """Verify analytics are isolated by tenant_id"""
    print_section("Testing Analytics Isolation")
    
    tenant1 = "verify_tenant1"
    tenant2 = "verify_tenant2"
    
    try:
        # Make queries for different tenants
        print(f"\n1. Making query as {tenant1}...")
        response = requests.post(
            f"{BASE_URL}/agent/message",
            json={"tenant_id": tenant1, "message": "Test query from tenant1"},
            timeout=10
        )
        print(f"   Status: {response.status_code}")
        
        print(f"\n2. Making query as {tenant2}...")
        response = requests.post(
            f"{BASE_URL}/agent/message",
            json={"tenant_id": tenant2, "message": "Test query from tenant2"},
            timeout=10
        )
        print(f"   Status: {response.status_code}")
        
        # Get analytics for tenant1
        print(f"\n3. Getting analytics for {tenant1}...")
        response = requests.get(
            f"{BASE_URL}/analytics/overview?days=30",
            headers={"x-tenant-id": tenant1},
            timeout=5
        )
        tenant1_analytics = response.json()
        print(f"   Total queries: {tenant1_analytics.get('total_queries', 0)}")
        
        # Get analytics for tenant2
        print(f"\n4. Getting analytics for {tenant2}...")
        response = requests.get(
            f"{BASE_URL}/analytics/overview?days=30",
            headers={"x-tenant-id": tenant2},
            timeout=5
        )
        tenant2_analytics = response.json()
        print(f"   Total queries: {tenant2_analytics.get('total_queries', 0)}")
        
        # Verify they're different
        print("\n5. Verifying isolation...")
        tenant1_queries = tenant1_analytics.get('total_queries', 0)
        tenant2_queries = tenant2_analytics.get('total_queries', 0)
        
        print(f"   Tenant1 queries: {tenant1_queries}")
        print(f"   Tenant2 queries: {tenant2_queries}")
        
        if tenant1_queries > 0 and tenant2_queries > 0:
            print("\nβœ… Analytics Isolation: PASSED (both tenants have their own data)")
            return True
        else:
            print("\n⚠️ Analytics Isolation: Need more queries to verify")
            return True
            
    except requests.exceptions.ConnectionError:
        print("\n⚠️ Cannot connect to API. Make sure it's running:")
        print("   uvicorn backend.api.main:app --port 8000")
        return None
    except Exception as e:
        print(f"\n❌ Error: {e}")
        import traceback
        traceback.print_exc()
        return False


def verify_rag_isolation():
    """Verify RAG documents are isolated by tenant_id"""
    print_section("Testing RAG Document Isolation")
    
    tenant1 = "verify_tenant1"
    tenant2 = "verify_tenant2"
    
    try:
        # Ingest documents for different tenants
        print(f"\n1. Ingesting document for {tenant1}...")
        response = requests.post(
            f"{BASE_URL}/rag/ingest-document",
            headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
            json={
                "content": "This is a confidential document for Tenant 1 only. Secret code: TENANT1_SECRET_12345",
                "source_type": "raw_text"
            },
            timeout=10
        )
        print(f"   Status: {response.status_code}")
        if response.status_code != 200:
            print(f"   Error: {response.text}")
        
        print(f"\n2. Ingesting document for {tenant2}...")
        response = requests.post(
            f"{BASE_URL}/rag/ingest-document",
            headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
            json={
                "content": "This is a confidential document for Tenant 2 only. Secret code: TENANT2_SECRET_67890",
                "source_type": "raw_text"
            },
            timeout=10
        )
        print(f"   Status: {response.status_code}")
        if response.status_code != 200:
            print(f"   Error: {response.text}")
        
        # List documents for tenant1
        print(f"\n3. Listing documents for {tenant1}...")
        response = requests.get(
            f"{BASE_URL}/rag/list",
            headers={"x-tenant-id": tenant1},
            timeout=5
        )
        tenant1_docs = response.json().get("documents", [])
        print(f"   Found {len(tenant1_docs)} documents")
        
        # List documents for tenant2
        print(f"\n4. Listing documents for {tenant2}...")
        response = requests.get(
            f"{BASE_URL}/rag/list",
            headers={"x-tenant-id": tenant2},
            timeout=5
        )
        tenant2_docs = response.json().get("documents", [])
        print(f"   Found {len(tenant2_docs)} documents")
        
        # Search for tenant1's secret
        print(f"\n5. Searching for tenant1's secret as tenant1...")
        response = requests.post(
            f"{BASE_URL}/rag/search",
            headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
            json={"query": "TENANT1_SECRET"},
            timeout=10
        )
        tenant1_search = response.json()
        
        # Check only the result texts, not the entire JSON (which includes the query)
        tenant1_results = tenant1_search.get("results", [])
        tenant1_found = False
        for result in tenant1_results:
            result_text = result.get("text", "") or result.get("content", "") or str(result)
            if "TENANT1_SECRET" in result_text:
                tenant1_found = True
                break
        
        print(f"   Found: {tenant1_found}")
        if tenant1_results:
            print(f"   Results count: {len(tenant1_results)}")
            if tenant1_results:
                print(f"   First result preview: {str(tenant1_results[0].get('text', ''))[:100]}...")
        
        # Search for tenant1's secret as tenant2 (should NOT find it)
        print(f"\n6. Searching for tenant1's secret as tenant2 (should NOT find)...")
        response = requests.post(
            f"{BASE_URL}/rag/search",
            headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
            json={"query": "TENANT1_SECRET"},
            timeout=10
        )
        tenant2_search = response.json()
        
        # Check results more carefully
        tenant2_results = tenant2_search.get("results", [])
        tenant2_found = False
        tenant2_found_texts = []
        
        for result in tenant2_results:
            result_text = result.get("text", "") or result.get("content", "") or str(result)
            if "TENANT1_SECRET" in result_text:
                tenant2_found = True
                tenant2_found_texts.append(result_text[:100])
        
        print(f"   Found: {tenant2_found}")
        print(f"   Results count: {len(tenant2_results)}")
        if tenant2_results:
            print(f"   First result preview: {str(tenant2_results[0])[:150]}")
        if tenant2_found_texts:
            print(f"   ⚠️ Found TENANT1_SECRET in {len(tenant2_found_texts)} result(s):")
            for i, text in enumerate(tenant2_found_texts, 1):
                print(f"      {i}. {text}...")
        
        # Verify isolation
        print("\n7. Verifying isolation...")
        if tenant1_found and not tenant2_found:
            print("   βœ… Tenant1 can find their own secret")
            print("   βœ… Tenant2 cannot find tenant1's secret")
            print("\nβœ… RAG Isolation: PASSED")
            return True
        elif tenant1_found and tenant2_found:
            print("   ❌ Tenant2 can see tenant1's secret - ISOLATION FAILED!")
            print(f"   Debug: tenant2 found {len(tenant2_found_texts)} result(s) containing TENANT1_SECRET")
            print("\n❌ RAG Isolation: FAILED")
            return False
        else:
            print("   ⚠️ Could not verify (may need RAG server running)")
            print("\n⚠️ RAG Isolation: INCONCLUSIVE")
            return None
            
    except requests.exceptions.ConnectionError:
        print("\n⚠️ Cannot connect to API/RAG server. Make sure they're running:")
        print("   uvicorn backend.api.main:app --port 8000")
        print("   python backend/mcp_server/server.py")
        return None
    except Exception as e:
        print(f"\n❌ Error: {e}")
        import traceback
        traceback.print_exc()
        return False


def verify_database_directly():
    """Verify tenant_id in database directly"""
    print_section("Verifying Database Directly")
    
    try:
        from api.storage.analytics_store import AnalyticsStore
        from api.storage.rules_store import RulesStore
        
        # Check analytics store
        print("\n1. Checking Analytics Store...")
        analytics = AnalyticsStore()
        
        # Log events for different tenants
        analytics.log_tool_usage("db_verify_tenant1", "rag", latency_ms=100)
        analytics.log_tool_usage("db_verify_tenant2", "web", latency_ms=200)
        
        # Get stats
        tenant1_stats = analytics.get_tool_usage_stats("db_verify_tenant1")
        tenant2_stats = analytics.get_tool_usage_stats("db_verify_tenant2")
        
        print(f"   Tenant1 stats: {list(tenant1_stats.keys())}")
        print(f"   Tenant2 stats: {list(tenant2_stats.keys())}")
        
        # Check rules store
        print("\n2. Checking Rules Store...")
        rules = RulesStore()
        
        rules.add_rule("db_verify_tenant1", "Rule 1", severity="high")
        rules.add_rule("db_verify_tenant2", "Rule 2", severity="low")
        
        tenant1_rules = rules.get_rules("db_verify_tenant1")
        tenant2_rules = rules.get_rules("db_verify_tenant2")
        
        print(f"   Tenant1 rules: {tenant1_rules}")
        print(f"   Tenant2 rules: {tenant2_rules}")
        
        # Verify isolation
        print("\n3. Verifying isolation...")
        tenant1_has_rule1 = "Rule 1" in tenant1_rules
        tenant1_has_rule2 = "Rule 2" in tenant1_rules
        tenant2_has_rule1 = "Rule 1" in tenant2_rules
        tenant2_has_rule2 = "Rule 2" in tenant2_rules
        
        print(f"   Tenant1 has Rule 1: {tenant1_has_rule1} βœ“")
        print(f"   Tenant1 has Rule 2: {tenant1_has_rule2} {'βœ— FAILED!' if tenant1_has_rule2 else 'βœ“ PASSED'}")
        print(f"   Tenant2 has Rule 1: {tenant2_has_rule1} {'βœ— FAILED!' if tenant2_has_rule1 else 'βœ“ PASSED'}")
        print(f"   Tenant2 has Rule 2: {tenant2_has_rule2} βœ“")
        
        if tenant1_has_rule1 and not tenant1_has_rule2 and not tenant2_has_rule1 and tenant2_has_rule2:
            print("\nβœ… Database Direct Verification: PASSED")
            return True
        else:
            print("\n❌ Database Direct Verification: FAILED")
            return False
            
    except Exception as e:
        print(f"\n❌ Error: {e}")
        import traceback
        traceback.print_exc()
        return False


def main():
    """Run all verification tests"""
    print("\n" + "πŸ”" * 30)
    print("Tenant ID Isolation Verification")
    print("πŸ”" * 30)
    
    results = []
    
    # Test 1: Database direct verification (always runs, no API needed)
    print("\nπŸ“Š Running database direct verification (no API required)...")
    result = verify_database_directly()
    if result is not None:
        results.append(result)
    
    # Test 2: Admin rules isolation (requires API running)
    print("\nπŸ“‹ Testing admin rules isolation (requires API)...")
    result = verify_admin_rules_isolation()
    if result is not None:
        results.append(result)
    
    # Test 3: Analytics isolation (requires API running)
    print("\nπŸ“ˆ Testing analytics isolation (requires API)...")
    result = verify_analytics_isolation()
    if result is not None:
        results.append(result)
    
    # Test 4: RAG isolation (requires API and RAG server running)
    print("\nπŸ“š Testing RAG document isolation (requires API + RAG server)...")
    result = verify_rag_isolation()
    if result is not None:
        results.append(result)
    
    # Summary
    print_section("Verification Summary")
    passed = sum(1 for r in results if r is True)
    failed = sum(1 for r in results if r is False)
    total = len(results)
    
    print(f"\nTests Completed: {total}")
    print(f"βœ… Passed: {passed}")
    print(f"❌ Failed: {failed}")
    
    if total == 0:
        print("\n⚠️ No tests could run. Make sure services are running:")
        print("   - API: uvicorn backend.api.main:app --port 8000")
        print("   - MCP Server: python backend/mcp_server/server.py")
    elif failed == 0 and passed > 0:
        print("\nβœ… All tenant isolation tests PASSED!")
    elif failed > 0:
        print("\n❌ Some tenant isolation tests FAILED!")
    else:
        print("\n⚠️ Some tests were inconclusive or skipped")


if __name__ == "__main__":
    main()