IntegraChat / verify_tenant_isolation.py
nothingworry's picture
all the thing
78b6d7b
raw
history blame
17 kB
"""
verify_tenant_isolation.py
Script to verify tenant_id is properly used for data isolation
Usage:
python verify_tenant_isolation.py
This script tests:
- Admin rules isolation
- Analytics isolation
- RAG document isolation
- Database direct verification
"""
import requests
import json
from pathlib import Path
import sys
# Add backend to path
backend_dir = Path(__file__).parent / "backend"
sys.path.insert(0, str(backend_dir))
root_dir = Path(__file__).parent
sys.path.insert(0, str(root_dir))
BASE_URL = "http://localhost:8000"
def print_section(title):
"""Print a formatted section header"""
print("\n" + "="*60)
print(f" {title}")
print("="*60)
def verify_admin_rules_isolation():
"""Verify admin rules are isolated by tenant_id"""
print_section("Testing Admin Rules Isolation")
tenant1 = "verify_tenant1"
tenant2 = "verify_tenant2"
try:
# Add rules for different tenants
print(f"\n1. Adding rule for {tenant1}...")
response = requests.post(
f"{BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
json={"rule": f"Rule for {tenant1}", "severity": "high"},
timeout=5
)
print(f" Status: {response.status_code}")
print(f"\n2. Adding rule for {tenant2}...")
response = requests.post(
f"{BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
json={"rule": f"Rule for {tenant2}", "severity": "low"},
timeout=5
)
print(f" Status: {response.status_code}")
# Get rules for tenant1
print(f"\n3. Getting rules for {tenant1}...")
response = requests.get(
f"{BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant1},
timeout=5
)
tenant1_rules = response.json().get("rules", [])
print(f" Found {len(tenant1_rules)} rules")
print(f" Rules: {tenant1_rules}")
# Get rules for tenant2
print(f"\n4. Getting rules for {tenant2}...")
response = requests.get(
f"{BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant2},
timeout=5
)
tenant2_rules = response.json().get("rules", [])
print(f" Found {len(tenant2_rules)} rules")
print(f" Rules: {tenant2_rules}")
# Verify isolation
print("\n5. Verifying isolation...")
tenant1_rule_text = f"Rule for {tenant1}"
tenant2_rule_text = f"Rule for {tenant2}"
tenant1_has_own_rule = tenant1_rule_text in tenant1_rules
tenant1_has_other_rule = tenant2_rule_text in tenant1_rules
tenant2_has_own_rule = tenant2_rule_text in tenant2_rules
tenant2_has_other_rule = tenant1_rule_text in tenant2_rules
print(f" Tenant1 has own rule: {tenant1_has_own_rule} βœ“")
print(f" Tenant1 has other's rule: {tenant1_has_other_rule} {'βœ— FAILED!' if tenant1_has_other_rule else 'βœ“ PASSED'}")
print(f" Tenant2 has own rule: {tenant2_has_own_rule} βœ“")
print(f" Tenant2 has other's rule: {tenant2_has_other_rule} {'βœ— FAILED!' if tenant2_has_other_rule else 'βœ“ PASSED'}")
if not tenant1_has_other_rule and not tenant2_has_other_rule:
print("\nβœ… Admin Rules Isolation: PASSED")
return True
else:
print("\n❌ Admin Rules Isolation: FAILED")
return False
except requests.exceptions.ConnectionError:
print("\n⚠️ Cannot connect to API. Make sure it's running:")
print(" uvicorn backend.api.main:app --port 8000")
return None
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def verify_analytics_isolation():
"""Verify analytics are isolated by tenant_id"""
print_section("Testing Analytics Isolation")
tenant1 = "verify_tenant1"
tenant2 = "verify_tenant2"
try:
# Make queries for different tenants
print(f"\n1. Making query as {tenant1}...")
response = requests.post(
f"{BASE_URL}/agent/message",
json={"tenant_id": tenant1, "message": "Test query from tenant1"},
timeout=10
)
print(f" Status: {response.status_code}")
print(f"\n2. Making query as {tenant2}...")
response = requests.post(
f"{BASE_URL}/agent/message",
json={"tenant_id": tenant2, "message": "Test query from tenant2"},
timeout=10
)
print(f" Status: {response.status_code}")
# Get analytics for tenant1
print(f"\n3. Getting analytics for {tenant1}...")
response = requests.get(
f"{BASE_URL}/analytics/overview?days=30",
headers={"x-tenant-id": tenant1},
timeout=5
)
tenant1_analytics = response.json()
print(f" Total queries: {tenant1_analytics.get('total_queries', 0)}")
# Get analytics for tenant2
print(f"\n4. Getting analytics for {tenant2}...")
response = requests.get(
f"{BASE_URL}/analytics/overview?days=30",
headers={"x-tenant-id": tenant2},
timeout=5
)
tenant2_analytics = response.json()
print(f" Total queries: {tenant2_analytics.get('total_queries', 0)}")
# Verify they're different
print("\n5. Verifying isolation...")
tenant1_queries = tenant1_analytics.get('total_queries', 0)
tenant2_queries = tenant2_analytics.get('total_queries', 0)
print(f" Tenant1 queries: {tenant1_queries}")
print(f" Tenant2 queries: {tenant2_queries}")
if tenant1_queries > 0 and tenant2_queries > 0:
print("\nβœ… Analytics Isolation: PASSED (both tenants have their own data)")
return True
else:
print("\n⚠️ Analytics Isolation: Need more queries to verify")
return True
except requests.exceptions.ConnectionError:
print("\n⚠️ Cannot connect to API. Make sure it's running:")
print(" uvicorn backend.api.main:app --port 8000")
return None
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def verify_rag_isolation():
"""Verify RAG documents are isolated by tenant_id"""
print_section("Testing RAG Document Isolation")
tenant1 = "verify_tenant1"
tenant2 = "verify_tenant2"
try:
# Ingest documents for different tenants
print(f"\n1. Ingesting document for {tenant1}...")
response = requests.post(
f"{BASE_URL}/rag/ingest-document",
headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
json={
"content": "This is a confidential document for Tenant 1 only. Secret code: TENANT1_SECRET_12345",
"source_type": "raw_text"
},
timeout=10
)
print(f" Status: {response.status_code}")
if response.status_code != 200:
print(f" Error: {response.text}")
print(f"\n2. Ingesting document for {tenant2}...")
response = requests.post(
f"{BASE_URL}/rag/ingest-document",
headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
json={
"content": "This is a confidential document for Tenant 2 only. Secret code: TENANT2_SECRET_67890",
"source_type": "raw_text"
},
timeout=10
)
print(f" Status: {response.status_code}")
if response.status_code != 200:
print(f" Error: {response.text}")
# List documents for tenant1
print(f"\n3. Listing documents for {tenant1}...")
response = requests.get(
f"{BASE_URL}/rag/list",
headers={"x-tenant-id": tenant1},
timeout=5
)
tenant1_docs = response.json().get("documents", [])
print(f" Found {len(tenant1_docs)} documents")
# List documents for tenant2
print(f"\n4. Listing documents for {tenant2}...")
response = requests.get(
f"{BASE_URL}/rag/list",
headers={"x-tenant-id": tenant2},
timeout=5
)
tenant2_docs = response.json().get("documents", [])
print(f" Found {len(tenant2_docs)} documents")
# Search for tenant1's secret
print(f"\n5. Searching for tenant1's secret as tenant1...")
response = requests.post(
f"{BASE_URL}/rag/search",
headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
json={"query": "TENANT1_SECRET"},
timeout=10
)
tenant1_search = response.json()
# Check only the result texts, not the entire JSON (which includes the query)
tenant1_results = tenant1_search.get("results", [])
tenant1_found = False
for result in tenant1_results:
result_text = result.get("text", "") or result.get("content", "") or str(result)
if "TENANT1_SECRET" in result_text:
tenant1_found = True
break
print(f" Found: {tenant1_found}")
if tenant1_results:
print(f" Results count: {len(tenant1_results)}")
if tenant1_results:
print(f" First result preview: {str(tenant1_results[0].get('text', ''))[:100]}...")
# Search for tenant1's secret as tenant2 (should NOT find it)
print(f"\n6. Searching for tenant1's secret as tenant2 (should NOT find)...")
response = requests.post(
f"{BASE_URL}/rag/search",
headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
json={"query": "TENANT1_SECRET"},
timeout=10
)
tenant2_search = response.json()
# Check results more carefully
tenant2_results = tenant2_search.get("results", [])
tenant2_found = False
tenant2_found_texts = []
for result in tenant2_results:
result_text = result.get("text", "") or result.get("content", "") or str(result)
if "TENANT1_SECRET" in result_text:
tenant2_found = True
tenant2_found_texts.append(result_text[:100])
print(f" Found: {tenant2_found}")
print(f" Results count: {len(tenant2_results)}")
if tenant2_results:
print(f" First result preview: {str(tenant2_results[0])[:150]}")
if tenant2_found_texts:
print(f" ⚠️ Found TENANT1_SECRET in {len(tenant2_found_texts)} result(s):")
for i, text in enumerate(tenant2_found_texts, 1):
print(f" {i}. {text}...")
# Verify isolation
print("\n7. Verifying isolation...")
if tenant1_found and not tenant2_found:
print(" βœ… Tenant1 can find their own secret")
print(" βœ… Tenant2 cannot find tenant1's secret")
print("\nβœ… RAG Isolation: PASSED")
return True
elif tenant1_found and tenant2_found:
print(" ❌ Tenant2 can see tenant1's secret - ISOLATION FAILED!")
print(f" Debug: tenant2 found {len(tenant2_found_texts)} result(s) containing TENANT1_SECRET")
print("\n❌ RAG Isolation: FAILED")
return False
else:
print(" ⚠️ Could not verify (may need RAG server running)")
print("\n⚠️ RAG Isolation: INCONCLUSIVE")
return None
except requests.exceptions.ConnectionError:
print("\n⚠️ Cannot connect to API/RAG server. Make sure they're running:")
print(" uvicorn backend.api.main:app --port 8000")
print(" python backend/mcp_server/server.py")
return None
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def verify_database_directly():
"""Verify tenant_id in database directly"""
print_section("Verifying Database Directly")
try:
from api.storage.analytics_store import AnalyticsStore
from api.storage.rules_store import RulesStore
# Check analytics store
print("\n1. Checking Analytics Store...")
analytics = AnalyticsStore()
# Log events for different tenants
analytics.log_tool_usage("db_verify_tenant1", "rag", latency_ms=100)
analytics.log_tool_usage("db_verify_tenant2", "web", latency_ms=200)
# Get stats
tenant1_stats = analytics.get_tool_usage_stats("db_verify_tenant1")
tenant2_stats = analytics.get_tool_usage_stats("db_verify_tenant2")
print(f" Tenant1 stats: {list(tenant1_stats.keys())}")
print(f" Tenant2 stats: {list(tenant2_stats.keys())}")
# Check rules store
print("\n2. Checking Rules Store...")
rules = RulesStore()
rules.add_rule("db_verify_tenant1", "Rule 1", severity="high")
rules.add_rule("db_verify_tenant2", "Rule 2", severity="low")
tenant1_rules = rules.get_rules("db_verify_tenant1")
tenant2_rules = rules.get_rules("db_verify_tenant2")
print(f" Tenant1 rules: {tenant1_rules}")
print(f" Tenant2 rules: {tenant2_rules}")
# Verify isolation
print("\n3. Verifying isolation...")
tenant1_has_rule1 = "Rule 1" in tenant1_rules
tenant1_has_rule2 = "Rule 2" in tenant1_rules
tenant2_has_rule1 = "Rule 1" in tenant2_rules
tenant2_has_rule2 = "Rule 2" in tenant2_rules
print(f" Tenant1 has Rule 1: {tenant1_has_rule1} βœ“")
print(f" Tenant1 has Rule 2: {tenant1_has_rule2} {'βœ— FAILED!' if tenant1_has_rule2 else 'βœ“ PASSED'}")
print(f" Tenant2 has Rule 1: {tenant2_has_rule1} {'βœ— FAILED!' if tenant2_has_rule1 else 'βœ“ PASSED'}")
print(f" Tenant2 has Rule 2: {tenant2_has_rule2} βœ“")
if tenant1_has_rule1 and not tenant1_has_rule2 and not tenant2_has_rule1 and tenant2_has_rule2:
print("\nβœ… Database Direct Verification: PASSED")
return True
else:
print("\n❌ Database Direct Verification: FAILED")
return False
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all verification tests"""
print("\n" + "πŸ”" * 30)
print("Tenant ID Isolation Verification")
print("πŸ”" * 30)
results = []
# Test 1: Database direct verification (always runs, no API needed)
print("\nπŸ“Š Running database direct verification (no API required)...")
result = verify_database_directly()
if result is not None:
results.append(result)
# Test 2: Admin rules isolation (requires API running)
print("\nπŸ“‹ Testing admin rules isolation (requires API)...")
result = verify_admin_rules_isolation()
if result is not None:
results.append(result)
# Test 3: Analytics isolation (requires API running)
print("\nπŸ“ˆ Testing analytics isolation (requires API)...")
result = verify_analytics_isolation()
if result is not None:
results.append(result)
# Test 4: RAG isolation (requires API and RAG server running)
print("\nπŸ“š Testing RAG document isolation (requires API + RAG server)...")
result = verify_rag_isolation()
if result is not None:
results.append(result)
# Summary
print_section("Verification Summary")
passed = sum(1 for r in results if r is True)
failed = sum(1 for r in results if r is False)
total = len(results)
print(f"\nTests Completed: {total}")
print(f"βœ… Passed: {passed}")
print(f"❌ Failed: {failed}")
if total == 0:
print("\n⚠️ No tests could run. Make sure services are running:")
print(" - API: uvicorn backend.api.main:app --port 8000")
print(" - MCP Server: python backend/mcp_server/server.py")
elif failed == 0 and passed > 0:
print("\nβœ… All tenant isolation tests PASSED!")
elif failed > 0:
print("\n❌ Some tenant isolation tests FAILED!")
else:
print("\n⚠️ Some tests were inconclusive or skipped")
if __name__ == "__main__":
main()