Spaces:
Sleeping
Sleeping
| import sys | |
| import os | |
| import time | |
| # Add backend to path | |
| sys.path.append(os.path.join(os.getcwd(), 'backend')) | |
| from services.vector_store import VectorStore | |
| def verify_isolation(): | |
| print("Starting Verification of Session Isolation (Collection-based)...") | |
| # Initialize Store | |
| store = VectorStore() | |
| # Define Sessions | |
| session_a = "session_A_123" | |
| session_b = "session_B_456" | |
| # Define Data | |
| doc_a = [{"text": "The capital of France is Paris.", "metadata": {"source": "geography.pdf"}}] | |
| doc_b = [{"text": "The capital of Spain is Madrid.", "metadata": {"source": "geography.pdf"}}] | |
| # 1. Add Documents | |
| print("\n1️⃣ Adding documents...") | |
| store.add_document_chunks(1, doc_a, session_a) | |
| store.add_document_chunks(2, doc_b, session_b) | |
| # 2. Query Session A | |
| print("\n2️⃣ Querying Session A (Expect 'Paris')...") | |
| results_a = store.find_similar_chunks("capital", session_id=session_a, n_results=1) | |
| text_a = results_a['documents'][0][0] if results_a['documents'] and results_a['documents'][0] else "" | |
| print(f" Result: {text_a}") | |
| if "Paris" in text_a and "Madrid" not in text_a: | |
| print(" PASS: Session A sees only its own data.") | |
| else: | |
| print(" FAIL: Session A saw wrong data.") | |
| # 3. Query Session B | |
| print("\n3️⃣ Querying Session B (Expect 'Madrid')...") | |
| results_b = store.find_similar_chunks("capital", session_id=session_b, n_results=1) | |
| text_b = results_b['documents'][0][0] if results_b['documents'] and results_b['documents'][0] else "" | |
| print(f" Result: {text_b}") | |
| if "Madrid" in text_b and "Paris" not in text_b: | |
| print(" PASS: Session B sees only its own data.") | |
| else: | |
| print(" FAIL: Session B saw wrong data.") | |
| # 4. Delete Session A | |
| print("\n4️⃣ Deleting Session A...") | |
| store.delete_session_documents(session_a) | |
| # 5. Verify Deletion | |
| print("\n5️⃣ Verifying Deletion...") | |
| results_a_after = store.find_similar_chunks("capital", session_id=session_a, n_results=1) | |
| if not results_a_after['documents']: | |
| print(" PASS: Session A data is gone (Collection empty/deleted).") | |
| else: | |
| print(" FAIL: Session A data still exists.") | |
| results_b_after = store.find_similar_chunks("capital", session_id=session_b, n_results=1) | |
| if results_b_after['documents'] and results_b_after['documents'][0]: | |
| print(" PASS: Session B data still exists.") | |
| else: | |
| print(" FAIL: Session B data was accidentally deleted.") | |
| if __name__ == "__main__": | |
| verify_isolation() | |