Spaces:
Sleeping
Sleeping
File size: 5,017 Bytes
c509b44 78b6d7b c509b44 78b6d7b c509b44 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
"""
Diagnostic script to check RAG database tenant isolation
This script directly queries the database to verify tenant_id isolation.
"""
import sys
from pathlib import Path
# Add backend to path
backend_dir = Path(__file__).parent / "backend"
sys.path.insert(0, str(backend_dir))
def check_database():
"""Check database directly for tenant isolation"""
print("\n" + "="*60)
print("RAG Database Tenant Isolation Check")
print("="*60)
try:
from mcp_server.common.database import get_connection
import psycopg2.extras
conn = get_connection()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
# Check all tenant_ids in database
print("\n1. Checking all tenant_ids in database...")
cur.execute("SELECT DISTINCT tenant_id, COUNT(*) as count FROM documents GROUP BY tenant_id")
rows = cur.fetchall()
if not rows:
print(" ⚠️ No documents found in database")
cur.close()
conn.close()
return
print(f" Found {len(rows)} unique tenant(s):")
for row in rows:
print(f" - tenant_id: '{row['tenant_id']}' ({row['count']} documents)")
# Check for tenant1 documents
print("\n2. Checking documents for 'verify_tenant1'...")
cur.execute(
"SELECT id, tenant_id, LEFT(chunk_text, 50) as preview FROM documents WHERE tenant_id = %s LIMIT 5",
("verify_tenant1",)
)
tenant1_docs = cur.fetchall()
print(f" Found {len(tenant1_docs)} documents for verify_tenant1")
for doc in tenant1_docs:
preview = doc['preview'].replace('\n', ' ')
print(f" - ID: {doc['id']}, tenant_id: '{doc['tenant_id']}', preview: {preview[:50]}...")
# Check for tenant2 documents
print("\n3. Checking documents for 'verify_tenant2'...")
cur.execute(
"SELECT id, tenant_id, LEFT(chunk_text, 50) as preview FROM documents WHERE tenant_id = %s LIMIT 5",
("verify_tenant2",)
)
tenant2_docs = cur.fetchall()
print(f" Found {len(tenant2_docs)} documents for verify_tenant2")
for doc in tenant2_docs:
preview = doc['preview'].replace('\n', ' ')
print(f" - ID: {doc['id']}, tenant_id: '{doc['tenant_id']}', preview: {preview[:50]}...")
# Test search_vectors function directly
print("\n4. Testing search_vectors function directly...")
from mcp_server.common.embeddings import embed_text
from mcp_server.common.database import search_vectors
# Search for tenant1's secret as tenant1
query = "TENANT1_SECRET"
query_vector = embed_text(query)
results_tenant1 = search_vectors("verify_tenant1", query_vector, limit=5)
print(f" Searching for '{query}' as verify_tenant1: {len(results_tenant1)} results")
for i, result in enumerate(results_tenant1[:2], 1):
text_preview = result['text'][:80].replace('\n', ' ')
print(f" Result {i}: {text_preview}...")
# Search for tenant1's secret as tenant2 (should NOT find)
results_tenant2 = search_vectors("verify_tenant2", query_vector, limit=5)
print(f" Searching for '{query}' as verify_tenant2: {len(results_tenant2)} results")
if results_tenant2:
print(" ⚠️ WARNING: tenant2 found tenant1's secret!")
for i, result in enumerate(results_tenant2[:2], 1):
text_preview = result['text'][:80].replace('\n', ' ')
print(f" Result {i}: {text_preview}...")
else:
print(" ✅ PASSED: tenant2 cannot see tenant1's secret")
# Check for any documents with wrong tenant_id
print("\n5. Checking for data integrity issues...")
cur.execute("""
SELECT tenant_id, COUNT(*) as count
FROM documents
WHERE tenant_id IN ('verify_tenant1', 'verify_tenant2')
GROUP BY tenant_id
""")
integrity_check = cur.fetchall()
print(" Tenant document counts:")
for row in integrity_check:
print(f" - {row['tenant_id']}: {row['count']} documents")
cur.close()
conn.close()
print("\n" + "="*60)
if results_tenant2 and "TENANT1_SECRET" in str(results_tenant2):
print("❌ ISOLATION FAILED: tenant2 can see tenant1's documents")
else:
print("✅ Database isolation appears to be working correctly")
print("="*60)
except ImportError as e:
print(f"\n❌ Import error: {e}")
print(" Make sure you're running from the project root directory")
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
check_database()
|