Spaces:
Sleeping
Sleeping
| """ | |
| verify_tenant_isolation.py | |
| Script to verify tenant_id is properly used for data isolation | |
| Usage: | |
| python verify_tenant_isolation.py | |
| This script tests: | |
| - Admin rules isolation | |
| - Analytics isolation | |
| - RAG document isolation | |
| - Database direct verification | |
| """ | |
| import requests | |
| import json | |
| from pathlib import Path | |
| import sys | |
| # Add backend to path | |
| backend_dir = Path(__file__).parent / "backend" | |
| sys.path.insert(0, str(backend_dir)) | |
| root_dir = Path(__file__).parent | |
| sys.path.insert(0, str(root_dir)) | |
| BASE_URL = "http://localhost:8000" | |
| def print_section(title): | |
| """Print a formatted section header""" | |
| print("\n" + "="*60) | |
| print(f" {title}") | |
| print("="*60) | |
| def verify_admin_rules_isolation(): | |
| """Verify admin rules are isolated by tenant_id""" | |
| print_section("Testing Admin Rules Isolation") | |
| tenant1 = "verify_tenant1" | |
| tenant2 = "verify_tenant2" | |
| try: | |
| # Add rules for different tenants | |
| print(f"\n1. Adding rule for {tenant1}...") | |
| response = requests.post( | |
| f"{BASE_URL}/admin/rules", | |
| headers={"x-tenant-id": tenant1, "Content-Type": "application/json"}, | |
| json={"rule": f"Rule for {tenant1}", "severity": "high"}, | |
| timeout=5 | |
| ) | |
| print(f" Status: {response.status_code}") | |
| print(f"\n2. Adding rule for {tenant2}...") | |
| response = requests.post( | |
| f"{BASE_URL}/admin/rules", | |
| headers={"x-tenant-id": tenant2, "Content-Type": "application/json"}, | |
| json={"rule": f"Rule for {tenant2}", "severity": "low"}, | |
| timeout=5 | |
| ) | |
| print(f" Status: {response.status_code}") | |
| # Get rules for tenant1 | |
| print(f"\n3. Getting rules for {tenant1}...") | |
| response = requests.get( | |
| f"{BASE_URL}/admin/rules", | |
| headers={"x-tenant-id": tenant1}, | |
| timeout=5 | |
| ) | |
| tenant1_rules = response.json().get("rules", []) | |
| print(f" Found {len(tenant1_rules)} rules") | |
| print(f" Rules: {tenant1_rules}") | |
| # Get rules for tenant2 | |
| print(f"\n4. Getting rules for {tenant2}...") | |
| response = requests.get( | |
| f"{BASE_URL}/admin/rules", | |
| headers={"x-tenant-id": tenant2}, | |
| timeout=5 | |
| ) | |
| tenant2_rules = response.json().get("rules", []) | |
| print(f" Found {len(tenant2_rules)} rules") | |
| print(f" Rules: {tenant2_rules}") | |
| # Verify isolation | |
| print("\n5. Verifying isolation...") | |
| tenant1_rule_text = f"Rule for {tenant1}" | |
| tenant2_rule_text = f"Rule for {tenant2}" | |
| tenant1_has_own_rule = tenant1_rule_text in tenant1_rules | |
| tenant1_has_other_rule = tenant2_rule_text in tenant1_rules | |
| tenant2_has_own_rule = tenant2_rule_text in tenant2_rules | |
| tenant2_has_other_rule = tenant1_rule_text in tenant2_rules | |
| print(f" Tenant1 has own rule: {tenant1_has_own_rule} β") | |
| print(f" Tenant1 has other's rule: {tenant1_has_other_rule} {'β FAILED!' if tenant1_has_other_rule else 'β PASSED'}") | |
| print(f" Tenant2 has own rule: {tenant2_has_own_rule} β") | |
| print(f" Tenant2 has other's rule: {tenant2_has_other_rule} {'β FAILED!' if tenant2_has_other_rule else 'β PASSED'}") | |
| if not tenant1_has_other_rule and not tenant2_has_other_rule: | |
| print("\nβ Admin Rules Isolation: PASSED") | |
| return True | |
| else: | |
| print("\nβ Admin Rules Isolation: FAILED") | |
| return False | |
| except requests.exceptions.ConnectionError: | |
| print("\nβ οΈ Cannot connect to API. Make sure it's running:") | |
| print(" uvicorn backend.api.main:app --port 8000") | |
| return None | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def verify_analytics_isolation(): | |
| """Verify analytics are isolated by tenant_id""" | |
| print_section("Testing Analytics Isolation") | |
| tenant1 = "verify_tenant1" | |
| tenant2 = "verify_tenant2" | |
| try: | |
| # Make queries for different tenants | |
| print(f"\n1. Making query as {tenant1}...") | |
| response = requests.post( | |
| f"{BASE_URL}/agent/message", | |
| json={"tenant_id": tenant1, "message": "Test query from tenant1"}, | |
| timeout=10 | |
| ) | |
| print(f" Status: {response.status_code}") | |
| print(f"\n2. Making query as {tenant2}...") | |
| response = requests.post( | |
| f"{BASE_URL}/agent/message", | |
| json={"tenant_id": tenant2, "message": "Test query from tenant2"}, | |
| timeout=10 | |
| ) | |
| print(f" Status: {response.status_code}") | |
| # Get analytics for tenant1 | |
| print(f"\n3. Getting analytics for {tenant1}...") | |
| response = requests.get( | |
| f"{BASE_URL}/analytics/overview?days=30", | |
| headers={"x-tenant-id": tenant1}, | |
| timeout=5 | |
| ) | |
| tenant1_analytics = response.json() | |
| print(f" Total queries: {tenant1_analytics.get('total_queries', 0)}") | |
| # Get analytics for tenant2 | |
| print(f"\n4. Getting analytics for {tenant2}...") | |
| response = requests.get( | |
| f"{BASE_URL}/analytics/overview?days=30", | |
| headers={"x-tenant-id": tenant2}, | |
| timeout=5 | |
| ) | |
| tenant2_analytics = response.json() | |
| print(f" Total queries: {tenant2_analytics.get('total_queries', 0)}") | |
| # Verify they're different | |
| print("\n5. Verifying isolation...") | |
| tenant1_queries = tenant1_analytics.get('total_queries', 0) | |
| tenant2_queries = tenant2_analytics.get('total_queries', 0) | |
| print(f" Tenant1 queries: {tenant1_queries}") | |
| print(f" Tenant2 queries: {tenant2_queries}") | |
| if tenant1_queries > 0 and tenant2_queries > 0: | |
| print("\nβ Analytics Isolation: PASSED (both tenants have their own data)") | |
| return True | |
| else: | |
| print("\nβ οΈ Analytics Isolation: Need more queries to verify") | |
| return True | |
| except requests.exceptions.ConnectionError: | |
| print("\nβ οΈ Cannot connect to API. Make sure it's running:") | |
| print(" uvicorn backend.api.main:app --port 8000") | |
| return None | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def verify_rag_isolation(): | |
| """Verify RAG documents are isolated by tenant_id""" | |
| print_section("Testing RAG Document Isolation") | |
| tenant1 = "verify_tenant1" | |
| tenant2 = "verify_tenant2" | |
| try: | |
| # Ingest documents for different tenants | |
| print(f"\n1. Ingesting document for {tenant1}...") | |
| response = requests.post( | |
| f"{BASE_URL}/rag/ingest-document", | |
| headers={"x-tenant-id": tenant1, "Content-Type": "application/json"}, | |
| json={ | |
| "content": "This is a confidential document for Tenant 1 only. Secret code: TENANT1_SECRET_12345", | |
| "source_type": "raw_text" | |
| }, | |
| timeout=10 | |
| ) | |
| print(f" Status: {response.status_code}") | |
| if response.status_code != 200: | |
| print(f" Error: {response.text}") | |
| print(f"\n2. Ingesting document for {tenant2}...") | |
| response = requests.post( | |
| f"{BASE_URL}/rag/ingest-document", | |
| headers={"x-tenant-id": tenant2, "Content-Type": "application/json"}, | |
| json={ | |
| "content": "This is a confidential document for Tenant 2 only. Secret code: TENANT2_SECRET_67890", | |
| "source_type": "raw_text" | |
| }, | |
| timeout=10 | |
| ) | |
| print(f" Status: {response.status_code}") | |
| if response.status_code != 200: | |
| print(f" Error: {response.text}") | |
| # List documents for tenant1 | |
| print(f"\n3. Listing documents for {tenant1}...") | |
| response = requests.get( | |
| f"{BASE_URL}/rag/list", | |
| headers={"x-tenant-id": tenant1}, | |
| timeout=5 | |
| ) | |
| tenant1_docs = response.json().get("documents", []) | |
| print(f" Found {len(tenant1_docs)} documents") | |
| # List documents for tenant2 | |
| print(f"\n4. Listing documents for {tenant2}...") | |
| response = requests.get( | |
| f"{BASE_URL}/rag/list", | |
| headers={"x-tenant-id": tenant2}, | |
| timeout=5 | |
| ) | |
| tenant2_docs = response.json().get("documents", []) | |
| print(f" Found {len(tenant2_docs)} documents") | |
| # Search for tenant1's secret | |
| print(f"\n5. Searching for tenant1's secret as tenant1...") | |
| response = requests.post( | |
| f"{BASE_URL}/rag/search", | |
| headers={"x-tenant-id": tenant1, "Content-Type": "application/json"}, | |
| json={"query": "TENANT1_SECRET"}, | |
| timeout=10 | |
| ) | |
| tenant1_search = response.json() | |
| # Check only the result texts, not the entire JSON (which includes the query) | |
| tenant1_results = tenant1_search.get("results", []) | |
| tenant1_found = False | |
| for result in tenant1_results: | |
| result_text = result.get("text", "") or result.get("content", "") or str(result) | |
| if "TENANT1_SECRET" in result_text: | |
| tenant1_found = True | |
| break | |
| print(f" Found: {tenant1_found}") | |
| if tenant1_results: | |
| print(f" Results count: {len(tenant1_results)}") | |
| if tenant1_results: | |
| print(f" First result preview: {str(tenant1_results[0].get('text', ''))[:100]}...") | |
| # Search for tenant1's secret as tenant2 (should NOT find it) | |
| print(f"\n6. Searching for tenant1's secret as tenant2 (should NOT find)...") | |
| response = requests.post( | |
| f"{BASE_URL}/rag/search", | |
| headers={"x-tenant-id": tenant2, "Content-Type": "application/json"}, | |
| json={"query": "TENANT1_SECRET"}, | |
| timeout=10 | |
| ) | |
| tenant2_search = response.json() | |
| # Check results more carefully | |
| tenant2_results = tenant2_search.get("results", []) | |
| tenant2_found = False | |
| tenant2_found_texts = [] | |
| for result in tenant2_results: | |
| result_text = result.get("text", "") or result.get("content", "") or str(result) | |
| if "TENANT1_SECRET" in result_text: | |
| tenant2_found = True | |
| tenant2_found_texts.append(result_text[:100]) | |
| print(f" Found: {tenant2_found}") | |
| print(f" Results count: {len(tenant2_results)}") | |
| if tenant2_results: | |
| print(f" First result preview: {str(tenant2_results[0])[:150]}") | |
| if tenant2_found_texts: | |
| print(f" β οΈ Found TENANT1_SECRET in {len(tenant2_found_texts)} result(s):") | |
| for i, text in enumerate(tenant2_found_texts, 1): | |
| print(f" {i}. {text}...") | |
| # Verify isolation | |
| print("\n7. Verifying isolation...") | |
| if tenant1_found and not tenant2_found: | |
| print(" β Tenant1 can find their own secret") | |
| print(" β Tenant2 cannot find tenant1's secret") | |
| print("\nβ RAG Isolation: PASSED") | |
| return True | |
| elif tenant1_found and tenant2_found: | |
| print(" β Tenant2 can see tenant1's secret - ISOLATION FAILED!") | |
| print(f" Debug: tenant2 found {len(tenant2_found_texts)} result(s) containing TENANT1_SECRET") | |
| print("\nβ RAG Isolation: FAILED") | |
| return False | |
| else: | |
| print(" β οΈ Could not verify (may need RAG server running)") | |
| print("\nβ οΈ RAG Isolation: INCONCLUSIVE") | |
| return None | |
| except requests.exceptions.ConnectionError: | |
| print("\nβ οΈ Cannot connect to API/RAG server. Make sure they're running:") | |
| print(" uvicorn backend.api.main:app --port 8000") | |
| print(" python backend/mcp_server/server.py") | |
| return None | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def verify_database_directly(): | |
| """Verify tenant_id in database directly""" | |
| print_section("Verifying Database Directly") | |
| try: | |
| from api.storage.analytics_store import AnalyticsStore | |
| from api.storage.rules_store import RulesStore | |
| # Check analytics store | |
| print("\n1. Checking Analytics Store...") | |
| analytics = AnalyticsStore() | |
| # Log events for different tenants | |
| analytics.log_tool_usage("db_verify_tenant1", "rag", latency_ms=100) | |
| analytics.log_tool_usage("db_verify_tenant2", "web", latency_ms=200) | |
| # Get stats | |
| tenant1_stats = analytics.get_tool_usage_stats("db_verify_tenant1") | |
| tenant2_stats = analytics.get_tool_usage_stats("db_verify_tenant2") | |
| print(f" Tenant1 stats: {list(tenant1_stats.keys())}") | |
| print(f" Tenant2 stats: {list(tenant2_stats.keys())}") | |
| # Check rules store | |
| print("\n2. Checking Rules Store...") | |
| rules = RulesStore() | |
| rules.add_rule("db_verify_tenant1", "Rule 1", severity="high") | |
| rules.add_rule("db_verify_tenant2", "Rule 2", severity="low") | |
| tenant1_rules = rules.get_rules("db_verify_tenant1") | |
| tenant2_rules = rules.get_rules("db_verify_tenant2") | |
| print(f" Tenant1 rules: {tenant1_rules}") | |
| print(f" Tenant2 rules: {tenant2_rules}") | |
| # Verify isolation | |
| print("\n3. Verifying isolation...") | |
| tenant1_has_rule1 = "Rule 1" in tenant1_rules | |
| tenant1_has_rule2 = "Rule 2" in tenant1_rules | |
| tenant2_has_rule1 = "Rule 1" in tenant2_rules | |
| tenant2_has_rule2 = "Rule 2" in tenant2_rules | |
| print(f" Tenant1 has Rule 1: {tenant1_has_rule1} β") | |
| print(f" Tenant1 has Rule 2: {tenant1_has_rule2} {'β FAILED!' if tenant1_has_rule2 else 'β PASSED'}") | |
| print(f" Tenant2 has Rule 1: {tenant2_has_rule1} {'β FAILED!' if tenant2_has_rule1 else 'β PASSED'}") | |
| print(f" Tenant2 has Rule 2: {tenant2_has_rule2} β") | |
| if tenant1_has_rule1 and not tenant1_has_rule2 and not tenant2_has_rule1 and tenant2_has_rule2: | |
| print("\nβ Database Direct Verification: PASSED") | |
| return True | |
| else: | |
| print("\nβ Database Direct Verification: FAILED") | |
| return False | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def main(): | |
| """Run all verification tests""" | |
| print("\n" + "π" * 30) | |
| print("Tenant ID Isolation Verification") | |
| print("π" * 30) | |
| results = [] | |
| # Test 1: Database direct verification (always runs, no API needed) | |
| print("\nπ Running database direct verification (no API required)...") | |
| result = verify_database_directly() | |
| if result is not None: | |
| results.append(result) | |
| # Test 2: Admin rules isolation (requires API running) | |
| print("\nπ Testing admin rules isolation (requires API)...") | |
| result = verify_admin_rules_isolation() | |
| if result is not None: | |
| results.append(result) | |
| # Test 3: Analytics isolation (requires API running) | |
| print("\nπ Testing analytics isolation (requires API)...") | |
| result = verify_analytics_isolation() | |
| if result is not None: | |
| results.append(result) | |
| # Test 4: RAG isolation (requires API and RAG server running) | |
| print("\nπ Testing RAG document isolation (requires API + RAG server)...") | |
| result = verify_rag_isolation() | |
| if result is not None: | |
| results.append(result) | |
| # Summary | |
| print_section("Verification Summary") | |
| passed = sum(1 for r in results if r is True) | |
| failed = sum(1 for r in results if r is False) | |
| total = len(results) | |
| print(f"\nTests Completed: {total}") | |
| print(f"β Passed: {passed}") | |
| print(f"β Failed: {failed}") | |
| if total == 0: | |
| print("\nβ οΈ No tests could run. Make sure services are running:") | |
| print(" - API: uvicorn backend.api.main:app --port 8000") | |
| print(" - MCP Server: python backend/mcp_server/server.py") | |
| elif failed == 0 and passed > 0: | |
| print("\nβ All tenant isolation tests PASSED!") | |
| elif failed > 0: | |
| print("\nβ Some tenant isolation tests FAILED!") | |
| else: | |
| print("\nβ οΈ Some tests were inconclusive or skipped") | |
| if __name__ == "__main__": | |
| main() | |