| import subprocess |
| import time |
|
|
| from core.rag.datasource.vdb.couchbase.couchbase_vector import CouchbaseConfig, CouchbaseVector |
| from tests.integration_tests.vdb.test_vector_store import ( |
| AbstractVectorTest, |
| get_example_text, |
| setup_mock_redis, |
| ) |
|
|
|
|
| def wait_for_healthy_container(service_name="couchbase-server", timeout=300): |
| start_time = time.time() |
| while time.time() - start_time < timeout: |
| result = subprocess.run( |
| ["docker", "inspect", "--format", "{{.State.Health.Status}}", service_name], capture_output=True, text=True |
| ) |
| if result.stdout.strip() == "healthy": |
| print(f"{service_name} is healthy!") |
| return True |
| else: |
| print(f"Waiting for {service_name} to be healthy...") |
| time.sleep(10) |
| raise TimeoutError(f"{service_name} did not become healthy in time") |
|
|
|
|
| class CouchbaseTest(AbstractVectorTest): |
| def __init__(self): |
| super().__init__() |
| self.vector = CouchbaseVector( |
| collection_name=self.collection_name, |
| config=CouchbaseConfig( |
| connection_string="couchbase://127.0.0.1", |
| user="Administrator", |
| password="password", |
| bucket_name="Embeddings", |
| scope_name="_default", |
| ), |
| ) |
|
|
| def search_by_vector(self): |
| |
| time.sleep(5) |
| hits_by_vector = self.vector.search_by_vector(query_vector=self.example_embedding) |
| assert len(hits_by_vector) == 1 |
|
|
|
|
| def test_couchbase(setup_mock_redis): |
| wait_for_healthy_container("couchbase-server", timeout=60) |
| CouchbaseTest().run_all_tests() |
|
|