""" verify_tenant_isolation.py Script to verify tenant_id is properly used for data isolation Usage: python verify_tenant_isolation.py This script tests: - Admin rules isolation - Analytics isolation - RAG document isolation - Database direct verification """ import requests import json from pathlib import Path import sys # Add backend to path backend_dir = Path(__file__).parent / "backend" sys.path.insert(0, str(backend_dir)) root_dir = Path(__file__).parent sys.path.insert(0, str(root_dir)) BASE_URL = "http://localhost:8000" def print_section(title): """Print a formatted section header""" print("\n" + "="*60) print(f" {title}") print("="*60) def verify_admin_rules_isolation(): """Verify admin rules are isolated by tenant_id""" print_section("Testing Admin Rules Isolation") tenant1 = "verify_tenant1" tenant2 = "verify_tenant2" try: # Add rules for different tenants print(f"\n1. Adding rule for {tenant1}...") response = requests.post( f"{BASE_URL}/admin/rules", headers={"x-tenant-id": tenant1, "Content-Type": "application/json"}, json={"rule": f"Rule for {tenant1}", "severity": "high"}, timeout=5 ) print(f" Status: {response.status_code}") print(f"\n2. Adding rule for {tenant2}...") response = requests.post( f"{BASE_URL}/admin/rules", headers={"x-tenant-id": tenant2, "Content-Type": "application/json"}, json={"rule": f"Rule for {tenant2}", "severity": "low"}, timeout=5 ) print(f" Status: {response.status_code}") # Get rules for tenant1 print(f"\n3. Getting rules for {tenant1}...") response = requests.get( f"{BASE_URL}/admin/rules", headers={"x-tenant-id": tenant1}, timeout=5 ) tenant1_rules = response.json().get("rules", []) print(f" Found {len(tenant1_rules)} rules") print(f" Rules: {tenant1_rules}") # Get rules for tenant2 print(f"\n4. Getting rules for {tenant2}...") response = requests.get( f"{BASE_URL}/admin/rules", headers={"x-tenant-id": tenant2}, timeout=5 ) tenant2_rules = response.json().get("rules", []) print(f" Found {len(tenant2_rules)} rules") print(f" Rules: {tenant2_rules}") # Verify isolation print("\n5. Verifying isolation...") tenant1_rule_text = f"Rule for {tenant1}" tenant2_rule_text = f"Rule for {tenant2}" tenant1_has_own_rule = tenant1_rule_text in tenant1_rules tenant1_has_other_rule = tenant2_rule_text in tenant1_rules tenant2_has_own_rule = tenant2_rule_text in tenant2_rules tenant2_has_other_rule = tenant1_rule_text in tenant2_rules print(f" Tenant1 has own rule: {tenant1_has_own_rule} ✓") print(f" Tenant1 has other's rule: {tenant1_has_other_rule} {'✗ FAILED!' if tenant1_has_other_rule else '✓ PASSED'}") print(f" Tenant2 has own rule: {tenant2_has_own_rule} ✓") print(f" Tenant2 has other's rule: {tenant2_has_other_rule} {'✗ FAILED!' if tenant2_has_other_rule else '✓ PASSED'}") if not tenant1_has_other_rule and not tenant2_has_other_rule: print("\n✅ Admin Rules Isolation: PASSED") return True else: print("\n❌ Admin Rules Isolation: FAILED") return False except requests.exceptions.ConnectionError: print("\n⚠️ Cannot connect to API. Make sure it's running:") print(" uvicorn backend.api.main:app --port 8000") return None except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() return False def verify_analytics_isolation(): """Verify analytics are isolated by tenant_id""" print_section("Testing Analytics Isolation") tenant1 = "verify_tenant1" tenant2 = "verify_tenant2" try: # Make queries for different tenants print(f"\n1. Making query as {tenant1}...") response = requests.post( f"{BASE_URL}/agent/message", json={"tenant_id": tenant1, "message": "Test query from tenant1"}, timeout=10 ) print(f" Status: {response.status_code}") print(f"\n2. Making query as {tenant2}...") response = requests.post( f"{BASE_URL}/agent/message", json={"tenant_id": tenant2, "message": "Test query from tenant2"}, timeout=10 ) print(f" Status: {response.status_code}") # Get analytics for tenant1 print(f"\n3. Getting analytics for {tenant1}...") response = requests.get( f"{BASE_URL}/analytics/overview?days=30", headers={"x-tenant-id": tenant1}, timeout=5 ) tenant1_analytics = response.json() print(f" Total queries: {tenant1_analytics.get('total_queries', 0)}") # Get analytics for tenant2 print(f"\n4. Getting analytics for {tenant2}...") response = requests.get( f"{BASE_URL}/analytics/overview?days=30", headers={"x-tenant-id": tenant2}, timeout=5 ) tenant2_analytics = response.json() print(f" Total queries: {tenant2_analytics.get('total_queries', 0)}") # Verify they're different print("\n5. Verifying isolation...") tenant1_queries = tenant1_analytics.get('total_queries', 0) tenant2_queries = tenant2_analytics.get('total_queries', 0) print(f" Tenant1 queries: {tenant1_queries}") print(f" Tenant2 queries: {tenant2_queries}") if tenant1_queries > 0 and tenant2_queries > 0: print("\n✅ Analytics Isolation: PASSED (both tenants have their own data)") return True else: print("\n⚠️ Analytics Isolation: Need more queries to verify") return True except requests.exceptions.ConnectionError: print("\n⚠️ Cannot connect to API. Make sure it's running:") print(" uvicorn backend.api.main:app --port 8000") return None except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() return False def verify_rag_isolation(): """Verify RAG documents are isolated by tenant_id""" print_section("Testing RAG Document Isolation") tenant1 = "verify_tenant1" tenant2 = "verify_tenant2" try: # Ingest documents for different tenants print(f"\n1. Ingesting document for {tenant1}...") response = requests.post( f"{BASE_URL}/rag/ingest-document", headers={"x-tenant-id": tenant1, "Content-Type": "application/json"}, json={ "content": "This is a confidential document for Tenant 1 only. Secret code: TENANT1_SECRET_12345", "source_type": "raw_text" }, timeout=10 ) print(f" Status: {response.status_code}") if response.status_code != 200: print(f" Error: {response.text}") print(f"\n2. Ingesting document for {tenant2}...") response = requests.post( f"{BASE_URL}/rag/ingest-document", headers={"x-tenant-id": tenant2, "Content-Type": "application/json"}, json={ "content": "This is a confidential document for Tenant 2 only. Secret code: TENANT2_SECRET_67890", "source_type": "raw_text" }, timeout=10 ) print(f" Status: {response.status_code}") if response.status_code != 200: print(f" Error: {response.text}") # List documents for tenant1 print(f"\n3. Listing documents for {tenant1}...") response = requests.get( f"{BASE_URL}/rag/list", headers={"x-tenant-id": tenant1}, timeout=5 ) tenant1_docs = response.json().get("documents", []) print(f" Found {len(tenant1_docs)} documents") # List documents for tenant2 print(f"\n4. Listing documents for {tenant2}...") response = requests.get( f"{BASE_URL}/rag/list", headers={"x-tenant-id": tenant2}, timeout=5 ) tenant2_docs = response.json().get("documents", []) print(f" Found {len(tenant2_docs)} documents") # Search for tenant1's secret print(f"\n5. Searching for tenant1's secret as tenant1...") response = requests.post( f"{BASE_URL}/rag/search", headers={"x-tenant-id": tenant1, "Content-Type": "application/json"}, json={"query": "TENANT1_SECRET"}, timeout=10 ) tenant1_search = response.json() # Check only the result texts, not the entire JSON (which includes the query) tenant1_results = tenant1_search.get("results", []) tenant1_found = False for result in tenant1_results: result_text = result.get("text", "") or result.get("content", "") or str(result) if "TENANT1_SECRET" in result_text: tenant1_found = True break print(f" Found: {tenant1_found}") if tenant1_results: print(f" Results count: {len(tenant1_results)}") if tenant1_results: print(f" First result preview: {str(tenant1_results[0].get('text', ''))[:100]}...") # Search for tenant1's secret as tenant2 (should NOT find it) print(f"\n6. Searching for tenant1's secret as tenant2 (should NOT find)...") response = requests.post( f"{BASE_URL}/rag/search", headers={"x-tenant-id": tenant2, "Content-Type": "application/json"}, json={"query": "TENANT1_SECRET"}, timeout=10 ) tenant2_search = response.json() # Check results more carefully tenant2_results = tenant2_search.get("results", []) tenant2_found = False tenant2_found_texts = [] for result in tenant2_results: result_text = result.get("text", "") or result.get("content", "") or str(result) if "TENANT1_SECRET" in result_text: tenant2_found = True tenant2_found_texts.append(result_text[:100]) print(f" Found: {tenant2_found}") print(f" Results count: {len(tenant2_results)}") if tenant2_results: print(f" First result preview: {str(tenant2_results[0])[:150]}") if tenant2_found_texts: print(f" ⚠️ Found TENANT1_SECRET in {len(tenant2_found_texts)} result(s):") for i, text in enumerate(tenant2_found_texts, 1): print(f" {i}. {text}...") # Verify isolation print("\n7. Verifying isolation...") if tenant1_found and not tenant2_found: print(" ✅ Tenant1 can find their own secret") print(" ✅ Tenant2 cannot find tenant1's secret") print("\n✅ RAG Isolation: PASSED") return True elif tenant1_found and tenant2_found: print(" ❌ Tenant2 can see tenant1's secret - ISOLATION FAILED!") print(f" Debug: tenant2 found {len(tenant2_found_texts)} result(s) containing TENANT1_SECRET") print("\n❌ RAG Isolation: FAILED") return False else: print(" ⚠️ Could not verify (may need RAG server running)") print("\n⚠️ RAG Isolation: INCONCLUSIVE") return None except requests.exceptions.ConnectionError: print("\n⚠️ Cannot connect to API/RAG server. Make sure they're running:") print(" uvicorn backend.api.main:app --port 8000") print(" python backend/mcp_server/server.py") return None except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() return False def verify_database_directly(): """Verify tenant_id in database directly""" print_section("Verifying Database Directly") try: from api.storage.analytics_store import AnalyticsStore from api.storage.rules_store import RulesStore # Check analytics store print("\n1. Checking Analytics Store...") analytics = AnalyticsStore() # Log events for different tenants analytics.log_tool_usage("db_verify_tenant1", "rag", latency_ms=100) analytics.log_tool_usage("db_verify_tenant2", "web", latency_ms=200) # Get stats tenant1_stats = analytics.get_tool_usage_stats("db_verify_tenant1") tenant2_stats = analytics.get_tool_usage_stats("db_verify_tenant2") print(f" Tenant1 stats: {list(tenant1_stats.keys())}") print(f" Tenant2 stats: {list(tenant2_stats.keys())}") # Check rules store print("\n2. Checking Rules Store...") rules = RulesStore() rules.add_rule("db_verify_tenant1", "Rule 1", severity="high") rules.add_rule("db_verify_tenant2", "Rule 2", severity="low") tenant1_rules = rules.get_rules("db_verify_tenant1") tenant2_rules = rules.get_rules("db_verify_tenant2") print(f" Tenant1 rules: {tenant1_rules}") print(f" Tenant2 rules: {tenant2_rules}") # Verify isolation print("\n3. Verifying isolation...") tenant1_has_rule1 = "Rule 1" in tenant1_rules tenant1_has_rule2 = "Rule 2" in tenant1_rules tenant2_has_rule1 = "Rule 1" in tenant2_rules tenant2_has_rule2 = "Rule 2" in tenant2_rules print(f" Tenant1 has Rule 1: {tenant1_has_rule1} ✓") print(f" Tenant1 has Rule 2: {tenant1_has_rule2} {'✗ FAILED!' if tenant1_has_rule2 else '✓ PASSED'}") print(f" Tenant2 has Rule 1: {tenant2_has_rule1} {'✗ FAILED!' if tenant2_has_rule1 else '✓ PASSED'}") print(f" Tenant2 has Rule 2: {tenant2_has_rule2} ✓") if tenant1_has_rule1 and not tenant1_has_rule2 and not tenant2_has_rule1 and tenant2_has_rule2: print("\n✅ Database Direct Verification: PASSED") return True else: print("\n❌ Database Direct Verification: FAILED") return False except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() return False def main(): """Run all verification tests""" print("\n" + "🔍" * 30) print("Tenant ID Isolation Verification") print("🔍" * 30) results = [] # Test 1: Database direct verification (always runs, no API needed) print("\n📊 Running database direct verification (no API required)...") result = verify_database_directly() if result is not None: results.append(result) # Test 2: Admin rules isolation (requires API running) print("\n📋 Testing admin rules isolation (requires API)...") result = verify_admin_rules_isolation() if result is not None: results.append(result) # Test 3: Analytics isolation (requires API running) print("\n📈 Testing analytics isolation (requires API)...") result = verify_analytics_isolation() if result is not None: results.append(result) # Test 4: RAG isolation (requires API and RAG server running) print("\n📚 Testing RAG document isolation (requires API + RAG server)...") result = verify_rag_isolation() if result is not None: results.append(result) # Summary print_section("Verification Summary") passed = sum(1 for r in results if r is True) failed = sum(1 for r in results if r is False) total = len(results) print(f"\nTests Completed: {total}") print(f"✅ Passed: {passed}") print(f"❌ Failed: {failed}") if total == 0: print("\n⚠️ No tests could run. Make sure services are running:") print(" - API: uvicorn backend.api.main:app --port 8000") print(" - MCP Server: python backend/mcp_server/server.py") elif failed == 0 and passed > 0: print("\n✅ All tenant isolation tests PASSED!") elif failed > 0: print("\n❌ Some tenant isolation tests FAILED!") else: print("\n⚠️ Some tests were inconclusive or skipped") if __name__ == "__main__": main()