Spaces:
Runtime error
Runtime error
| # Tests the CustomResourceMemberlist provider | |
| import threading | |
| from chromadb.test.conftest import skip_if_not_cluster | |
| from kubernetes import client, config | |
| from chromadb.config import System, Settings | |
| from chromadb.segment.distributed import Memberlist | |
| from chromadb.segment.impl.distributed.segment_directory import ( | |
| CustomResourceMemberlistProvider, | |
| KUBERNETES_GROUP, | |
| KUBERNETES_NAMESPACE, | |
| ) | |
| import time | |
| # Used for testing to update the memberlist CRD | |
| def update_memberlist(n: int, memberlist_name: str = "test-memberlist") -> Memberlist: | |
| config.load_config() | |
| api_instance = client.CustomObjectsApi() | |
| members = [{"url": f"10.0.0.{i}"} for i in range(1, n + 1)] | |
| body = { | |
| "kind": "MemberList", | |
| "metadata": {"name": memberlist_name}, | |
| "spec": {"members": members}, | |
| } | |
| _ = api_instance.patch_namespaced_custom_object( | |
| group=KUBERNETES_GROUP, | |
| version="v1", | |
| namespace=KUBERNETES_NAMESPACE, | |
| plural="memberlists", | |
| name=memberlist_name, | |
| body=body, | |
| ) | |
| return [m["url"] for m in members] | |
| def compare_memberlists(m1: Memberlist, m2: Memberlist) -> bool: | |
| return sorted(m1) == sorted(m2) | |
| def test_can_get_memberlist() -> None: | |
| # This test assumes that the memberlist CRD is already created with the name "test-memberlist" | |
| system = System(Settings(allow_reset=True)) | |
| provider = system.instance(CustomResourceMemberlistProvider) | |
| provider.set_memberlist_name("test-memberlist") | |
| system.reset_state() | |
| system.start() | |
| # Update the memberlist | |
| members = update_memberlist(3) | |
| # Check that the memberlist is updated after a short delay | |
| time.sleep(2) | |
| assert compare_memberlists(provider.get_memberlist(), members) | |
| system.stop() | |
| def test_can_update_memberlist_multiple_times() -> None: | |
| # This test assumes that the memberlist CRD is already created with the name "test-memberlist" | |
| system = System(Settings(allow_reset=True)) | |
| provider = system.instance(CustomResourceMemberlistProvider) | |
| provider.set_memberlist_name("test-memberlist") | |
| system.reset_state() | |
| system.start() | |
| # Update the memberlist | |
| members = update_memberlist(3) | |
| # Check that the memberlist is updated after a short delay | |
| time.sleep(2) | |
| assert compare_memberlists(provider.get_memberlist(), members) | |
| # Update the memberlist again | |
| members = update_memberlist(5) | |
| # Check that the memberlist is updated after a short delay | |
| time.sleep(2) | |
| assert compare_memberlists(provider.get_memberlist(), members) | |
| system.stop() | |
| def test_stop_memberlist_kills_thread() -> None: | |
| # This test assumes that the memberlist CRD is already created with the name "test-memberlist" | |
| system = System(Settings(allow_reset=True)) | |
| provider = system.instance(CustomResourceMemberlistProvider) | |
| provider.set_memberlist_name("test-memberlist") | |
| system.reset_state() | |
| system.start() | |
| # Make sure a background thread is running | |
| assert len(threading.enumerate()) == 2 | |
| # Update the memberlist | |
| members = update_memberlist(3) | |
| # Check that the memberlist is updated after a short delay | |
| time.sleep(2) | |
| assert compare_memberlists(provider.get_memberlist(), members) | |
| # Stop the system | |
| system.stop() | |
| # Check to make sure only one thread is running | |
| assert len(threading.enumerate()) == 1 | |