Spaces:
Sleeping
Sleeping
File size: 16,980 Bytes
c509b44 78b6d7b c509b44 78b6d7b c509b44 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
"""
verify_tenant_isolation.py
Script to verify tenant_id is properly used for data isolation
Usage:
python verify_tenant_isolation.py
This script tests:
- Admin rules isolation
- Analytics isolation
- RAG document isolation
- Database direct verification
"""
import requests
import json
from pathlib import Path
import sys
# Add backend to path
backend_dir = Path(__file__).parent / "backend"
sys.path.insert(0, str(backend_dir))
root_dir = Path(__file__).parent
sys.path.insert(0, str(root_dir))
BASE_URL = "http://localhost:8000"
def print_section(title):
"""Print a formatted section header"""
print("\n" + "="*60)
print(f" {title}")
print("="*60)
def verify_admin_rules_isolation():
"""Verify admin rules are isolated by tenant_id"""
print_section("Testing Admin Rules Isolation")
tenant1 = "verify_tenant1"
tenant2 = "verify_tenant2"
try:
# Add rules for different tenants
print(f"\n1. Adding rule for {tenant1}...")
response = requests.post(
f"{BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
json={"rule": f"Rule for {tenant1}", "severity": "high"},
timeout=5
)
print(f" Status: {response.status_code}")
print(f"\n2. Adding rule for {tenant2}...")
response = requests.post(
f"{BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
json={"rule": f"Rule for {tenant2}", "severity": "low"},
timeout=5
)
print(f" Status: {response.status_code}")
# Get rules for tenant1
print(f"\n3. Getting rules for {tenant1}...")
response = requests.get(
f"{BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant1},
timeout=5
)
tenant1_rules = response.json().get("rules", [])
print(f" Found {len(tenant1_rules)} rules")
print(f" Rules: {tenant1_rules}")
# Get rules for tenant2
print(f"\n4. Getting rules for {tenant2}...")
response = requests.get(
f"{BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant2},
timeout=5
)
tenant2_rules = response.json().get("rules", [])
print(f" Found {len(tenant2_rules)} rules")
print(f" Rules: {tenant2_rules}")
# Verify isolation
print("\n5. Verifying isolation...")
tenant1_rule_text = f"Rule for {tenant1}"
tenant2_rule_text = f"Rule for {tenant2}"
tenant1_has_own_rule = tenant1_rule_text in tenant1_rules
tenant1_has_other_rule = tenant2_rule_text in tenant1_rules
tenant2_has_own_rule = tenant2_rule_text in tenant2_rules
tenant2_has_other_rule = tenant1_rule_text in tenant2_rules
print(f" Tenant1 has own rule: {tenant1_has_own_rule} β")
print(f" Tenant1 has other's rule: {tenant1_has_other_rule} {'β FAILED!' if tenant1_has_other_rule else 'β PASSED'}")
print(f" Tenant2 has own rule: {tenant2_has_own_rule} β")
print(f" Tenant2 has other's rule: {tenant2_has_other_rule} {'β FAILED!' if tenant2_has_other_rule else 'β PASSED'}")
if not tenant1_has_other_rule and not tenant2_has_other_rule:
print("\nβ
Admin Rules Isolation: PASSED")
return True
else:
print("\nβ Admin Rules Isolation: FAILED")
return False
except requests.exceptions.ConnectionError:
print("\nβ οΈ Cannot connect to API. Make sure it's running:")
print(" uvicorn backend.api.main:app --port 8000")
return None
except Exception as e:
print(f"\nβ Error: {e}")
import traceback
traceback.print_exc()
return False
def verify_analytics_isolation():
"""Verify analytics are isolated by tenant_id"""
print_section("Testing Analytics Isolation")
tenant1 = "verify_tenant1"
tenant2 = "verify_tenant2"
try:
# Make queries for different tenants
print(f"\n1. Making query as {tenant1}...")
response = requests.post(
f"{BASE_URL}/agent/message",
json={"tenant_id": tenant1, "message": "Test query from tenant1"},
timeout=10
)
print(f" Status: {response.status_code}")
print(f"\n2. Making query as {tenant2}...")
response = requests.post(
f"{BASE_URL}/agent/message",
json={"tenant_id": tenant2, "message": "Test query from tenant2"},
timeout=10
)
print(f" Status: {response.status_code}")
# Get analytics for tenant1
print(f"\n3. Getting analytics for {tenant1}...")
response = requests.get(
f"{BASE_URL}/analytics/overview?days=30",
headers={"x-tenant-id": tenant1},
timeout=5
)
tenant1_analytics = response.json()
print(f" Total queries: {tenant1_analytics.get('total_queries', 0)}")
# Get analytics for tenant2
print(f"\n4. Getting analytics for {tenant2}...")
response = requests.get(
f"{BASE_URL}/analytics/overview?days=30",
headers={"x-tenant-id": tenant2},
timeout=5
)
tenant2_analytics = response.json()
print(f" Total queries: {tenant2_analytics.get('total_queries', 0)}")
# Verify they're different
print("\n5. Verifying isolation...")
tenant1_queries = tenant1_analytics.get('total_queries', 0)
tenant2_queries = tenant2_analytics.get('total_queries', 0)
print(f" Tenant1 queries: {tenant1_queries}")
print(f" Tenant2 queries: {tenant2_queries}")
if tenant1_queries > 0 and tenant2_queries > 0:
print("\nβ
Analytics Isolation: PASSED (both tenants have their own data)")
return True
else:
print("\nβ οΈ Analytics Isolation: Need more queries to verify")
return True
except requests.exceptions.ConnectionError:
print("\nβ οΈ Cannot connect to API. Make sure it's running:")
print(" uvicorn backend.api.main:app --port 8000")
return None
except Exception as e:
print(f"\nβ Error: {e}")
import traceback
traceback.print_exc()
return False
def verify_rag_isolation():
"""Verify RAG documents are isolated by tenant_id"""
print_section("Testing RAG Document Isolation")
tenant1 = "verify_tenant1"
tenant2 = "verify_tenant2"
try:
# Ingest documents for different tenants
print(f"\n1. Ingesting document for {tenant1}...")
response = requests.post(
f"{BASE_URL}/rag/ingest-document",
headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
json={
"content": "This is a confidential document for Tenant 1 only. Secret code: TENANT1_SECRET_12345",
"source_type": "raw_text"
},
timeout=10
)
print(f" Status: {response.status_code}")
if response.status_code != 200:
print(f" Error: {response.text}")
print(f"\n2. Ingesting document for {tenant2}...")
response = requests.post(
f"{BASE_URL}/rag/ingest-document",
headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
json={
"content": "This is a confidential document for Tenant 2 only. Secret code: TENANT2_SECRET_67890",
"source_type": "raw_text"
},
timeout=10
)
print(f" Status: {response.status_code}")
if response.status_code != 200:
print(f" Error: {response.text}")
# List documents for tenant1
print(f"\n3. Listing documents for {tenant1}...")
response = requests.get(
f"{BASE_URL}/rag/list",
headers={"x-tenant-id": tenant1},
timeout=5
)
tenant1_docs = response.json().get("documents", [])
print(f" Found {len(tenant1_docs)} documents")
# List documents for tenant2
print(f"\n4. Listing documents for {tenant2}...")
response = requests.get(
f"{BASE_URL}/rag/list",
headers={"x-tenant-id": tenant2},
timeout=5
)
tenant2_docs = response.json().get("documents", [])
print(f" Found {len(tenant2_docs)} documents")
# Search for tenant1's secret
print(f"\n5. Searching for tenant1's secret as tenant1...")
response = requests.post(
f"{BASE_URL}/rag/search",
headers={"x-tenant-id": tenant1, "Content-Type": "application/json"},
json={"query": "TENANT1_SECRET"},
timeout=10
)
tenant1_search = response.json()
# Check only the result texts, not the entire JSON (which includes the query)
tenant1_results = tenant1_search.get("results", [])
tenant1_found = False
for result in tenant1_results:
result_text = result.get("text", "") or result.get("content", "") or str(result)
if "TENANT1_SECRET" in result_text:
tenant1_found = True
break
print(f" Found: {tenant1_found}")
if tenant1_results:
print(f" Results count: {len(tenant1_results)}")
if tenant1_results:
print(f" First result preview: {str(tenant1_results[0].get('text', ''))[:100]}...")
# Search for tenant1's secret as tenant2 (should NOT find it)
print(f"\n6. Searching for tenant1's secret as tenant2 (should NOT find)...")
response = requests.post(
f"{BASE_URL}/rag/search",
headers={"x-tenant-id": tenant2, "Content-Type": "application/json"},
json={"query": "TENANT1_SECRET"},
timeout=10
)
tenant2_search = response.json()
# Check results more carefully
tenant2_results = tenant2_search.get("results", [])
tenant2_found = False
tenant2_found_texts = []
for result in tenant2_results:
result_text = result.get("text", "") or result.get("content", "") or str(result)
if "TENANT1_SECRET" in result_text:
tenant2_found = True
tenant2_found_texts.append(result_text[:100])
print(f" Found: {tenant2_found}")
print(f" Results count: {len(tenant2_results)}")
if tenant2_results:
print(f" First result preview: {str(tenant2_results[0])[:150]}")
if tenant2_found_texts:
print(f" β οΈ Found TENANT1_SECRET in {len(tenant2_found_texts)} result(s):")
for i, text in enumerate(tenant2_found_texts, 1):
print(f" {i}. {text}...")
# Verify isolation
print("\n7. Verifying isolation...")
if tenant1_found and not tenant2_found:
print(" β
Tenant1 can find their own secret")
print(" β
Tenant2 cannot find tenant1's secret")
print("\nβ
RAG Isolation: PASSED")
return True
elif tenant1_found and tenant2_found:
print(" β Tenant2 can see tenant1's secret - ISOLATION FAILED!")
print(f" Debug: tenant2 found {len(tenant2_found_texts)} result(s) containing TENANT1_SECRET")
print("\nβ RAG Isolation: FAILED")
return False
else:
print(" β οΈ Could not verify (may need RAG server running)")
print("\nβ οΈ RAG Isolation: INCONCLUSIVE")
return None
except requests.exceptions.ConnectionError:
print("\nβ οΈ Cannot connect to API/RAG server. Make sure they're running:")
print(" uvicorn backend.api.main:app --port 8000")
print(" python backend/mcp_server/server.py")
return None
except Exception as e:
print(f"\nβ Error: {e}")
import traceback
traceback.print_exc()
return False
def verify_database_directly():
"""Verify tenant_id in database directly"""
print_section("Verifying Database Directly")
try:
from api.storage.analytics_store import AnalyticsStore
from api.storage.rules_store import RulesStore
# Check analytics store
print("\n1. Checking Analytics Store...")
analytics = AnalyticsStore()
# Log events for different tenants
analytics.log_tool_usage("db_verify_tenant1", "rag", latency_ms=100)
analytics.log_tool_usage("db_verify_tenant2", "web", latency_ms=200)
# Get stats
tenant1_stats = analytics.get_tool_usage_stats("db_verify_tenant1")
tenant2_stats = analytics.get_tool_usage_stats("db_verify_tenant2")
print(f" Tenant1 stats: {list(tenant1_stats.keys())}")
print(f" Tenant2 stats: {list(tenant2_stats.keys())}")
# Check rules store
print("\n2. Checking Rules Store...")
rules = RulesStore()
rules.add_rule("db_verify_tenant1", "Rule 1", severity="high")
rules.add_rule("db_verify_tenant2", "Rule 2", severity="low")
tenant1_rules = rules.get_rules("db_verify_tenant1")
tenant2_rules = rules.get_rules("db_verify_tenant2")
print(f" Tenant1 rules: {tenant1_rules}")
print(f" Tenant2 rules: {tenant2_rules}")
# Verify isolation
print("\n3. Verifying isolation...")
tenant1_has_rule1 = "Rule 1" in tenant1_rules
tenant1_has_rule2 = "Rule 2" in tenant1_rules
tenant2_has_rule1 = "Rule 1" in tenant2_rules
tenant2_has_rule2 = "Rule 2" in tenant2_rules
print(f" Tenant1 has Rule 1: {tenant1_has_rule1} β")
print(f" Tenant1 has Rule 2: {tenant1_has_rule2} {'β FAILED!' if tenant1_has_rule2 else 'β PASSED'}")
print(f" Tenant2 has Rule 1: {tenant2_has_rule1} {'β FAILED!' if tenant2_has_rule1 else 'β PASSED'}")
print(f" Tenant2 has Rule 2: {tenant2_has_rule2} β")
if tenant1_has_rule1 and not tenant1_has_rule2 and not tenant2_has_rule1 and tenant2_has_rule2:
print("\nβ
Database Direct Verification: PASSED")
return True
else:
print("\nβ Database Direct Verification: FAILED")
return False
except Exception as e:
print(f"\nβ Error: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all verification tests"""
print("\n" + "π" * 30)
print("Tenant ID Isolation Verification")
print("π" * 30)
results = []
# Test 1: Database direct verification (always runs, no API needed)
print("\nπ Running database direct verification (no API required)...")
result = verify_database_directly()
if result is not None:
results.append(result)
# Test 2: Admin rules isolation (requires API running)
print("\nπ Testing admin rules isolation (requires API)...")
result = verify_admin_rules_isolation()
if result is not None:
results.append(result)
# Test 3: Analytics isolation (requires API running)
print("\nπ Testing analytics isolation (requires API)...")
result = verify_analytics_isolation()
if result is not None:
results.append(result)
# Test 4: RAG isolation (requires API and RAG server running)
print("\nπ Testing RAG document isolation (requires API + RAG server)...")
result = verify_rag_isolation()
if result is not None:
results.append(result)
# Summary
print_section("Verification Summary")
passed = sum(1 for r in results if r is True)
failed = sum(1 for r in results if r is False)
total = len(results)
print(f"\nTests Completed: {total}")
print(f"β
Passed: {passed}")
print(f"β Failed: {failed}")
if total == 0:
print("\nβ οΈ No tests could run. Make sure services are running:")
print(" - API: uvicorn backend.api.main:app --port 8000")
print(" - MCP Server: python backend/mcp_server/server.py")
elif failed == 0 and passed > 0:
print("\nβ
All tenant isolation tests PASSED!")
elif failed > 0:
print("\nβ Some tenant isolation tests FAILED!")
else:
print("\nβ οΈ Some tests were inconclusive or skipped")
if __name__ == "__main__":
main()
|