File size: 5,942 Bytes
625e9e8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import logging
import os
import sys
import shutil
import chromadb
from chromadb.config import Settings
# Add the root directory to the Python path to allow for absolute imports
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from vector_db_manager import run_etl_pipeline
from document_processor import prepare_product_documents, prepare_review_documents
# Configure logging to display info level messages
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# --- Test Configuration ---
# Use a lightweight model for fast testing
TEST_MODEL = 'sentence-transformers/all-MiniLM-L6-v2'
# Use a temporary directory for the test database
TEST_DB_PATH = "./chroma_db_test"
def test_vector_db_population():
"""
Tests the full ETL pipeline: chunking, embedding, and populating ChromaDB.
It verifies that collections are created and populated correctly.
"""
logger.info("--- Starting Test: Vector DB Population ---")
# Define paths to the data files
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
products_file = os.path.join(base_dir, 'products.json')
reviews_file = os.path.join(base_dir, 'product_reviews.json')
db_manager = None
client = None
# Clean up any previous test database directory
if os.path.exists(TEST_DB_PATH):
logger.warning(f"Removing existing test database at: {TEST_DB_PATH}")
shutil.rmtree(TEST_DB_PATH)
try:
# --- 1. Run the ETL Pipeline ---
logger.info(f"Running ETL pipeline with model '{TEST_MODEL}' into '{TEST_DB_PATH}'")
db_manager = run_etl_pipeline(
products_file=products_file,
reviews_file=reviews_file,
db_path=TEST_DB_PATH,
model_name=TEST_MODEL
)
logger.info("ETL Pipeline finished.")
# --- 2. Verification Step ---
logger.info("\n--- Verifying Database Content ---")
assert os.path.exists(TEST_DB_PATH), "Database directory was not created."
client = chromadb.PersistentClient(
path=TEST_DB_PATH,
settings=Settings(allow_reset=True)
)
# --- Verify Products Collection ---
logger.info("[Verifying 'products' collection...]")
products_collection = client.get_collection(name="products")
assert products_collection is not None, "'products' collection not found."
# Get expected count from the source
expected_products_count = len(prepare_product_documents(products_file))
actual_products_count = products_collection.count()
assert actual_products_count == expected_products_count, \
f"Expected {expected_products_count} products, but found {actual_products_count} in DB."
logger.info(f"SUCCESS: 'products' collection count is correct ({actual_products_count}).")
# Perform a sample query
query_result = products_collection.query(query_texts=["lightweight laptop"], n_results=1)
assert query_result['ids'][0], "Sample query on 'products' collection returned no results."
logger.info(f"SUCCESS: Sample query on 'products' returned: {query_result['ids'][0][0]}")
# --- Verify Reviews Collection ---
logger.info("\n[Verifying 'reviews' collection...]")
reviews_collection = client.get_collection(name="reviews")
assert reviews_collection is not None, "'reviews' collection not found."
# Get expected count from the source
expected_reviews_count = len(prepare_review_documents(reviews_file, products_file))
actual_reviews_count = reviews_collection.count()
assert actual_reviews_count == expected_reviews_count, \
f"Expected {expected_reviews_count} reviews, but found {actual_reviews_count} in DB."
logger.info(f"SUCCESS: 'reviews' collection count is correct ({actual_reviews_count}).")
# Perform a sample query
query_result = reviews_collection.query(query_texts=["disappointed with battery"], n_results=1)
assert query_result['ids'][0], "Sample query on 'reviews' collection returned no results."
logger.info(f"SUCCESS: Sample query on 'reviews' returned: {query_result['ids'][0][0]}")
# Reset the client to release file locks before cleanup
logger.info("Shutting down clients to release file locks.")
if db_manager:
db_manager.shutdown()
if client:
client.reset()
finally:
# --- 3. Cleanup Step ---
logger.info("\n--- Cleaning up test database ---")
# Explicitly shut down both client instances to release file locks
if db_manager:
logger.info("Shutting down ETL client...")
db_manager.shutdown()
if client:
logger.info("Shutting down verification client...")
client.reset()
# Implement a retry mechanism for shutil.rmtree to handle Windows file lock race conditions
if os.path.exists(TEST_DB_PATH):
for i in range(5): # Retry up to 5 times
try:
shutil.rmtree(TEST_DB_PATH)
logger.info(f"Successfully removed test database at: {TEST_DB_PATH}")
break # Exit loop if successful
except (PermissionError, OSError) as e:
logger.warning(f"Cleanup attempt {i+1} failed: {e}. Retrying in 2 seconds...")
import time
time.sleep(2)
else: # This 'else' belongs to the 'for' loop
logger.error("Failed to remove test database directory after multiple attempts. It may need to be removed manually.")
logger.info("\n--- Test Finished Successfully ---")
if __name__ == "__main__":
test_vector_db_population()
|