Spaces:
Runtime error
Runtime error
| # A thin wrapper around the pulsar admin api | |
| import requests | |
| from chromadb.config import System | |
| from chromadb.ingest.impl.utils import parse_topic_name | |
| class PulsarAdmin: | |
| """A thin wrapper around the pulsar admin api, only used for interim development towards distributed chroma. | |
| This functionality will be moved to the chroma coordinator.""" | |
| _connection_str: str | |
| def __init__(self, system: System): | |
| pulsar_host = system.settings.require("pulsar_broker_url") | |
| pulsar_port = system.settings.require("pulsar_admin_port") | |
| self._connection_str = f"http://{pulsar_host}:{pulsar_port}" | |
| # Create the default tenant and namespace | |
| # This is a temporary workaround until we have a proper tenant/namespace management system | |
| self.create_tenant("default") | |
| self.create_namespace("default", "default") | |
| def create_tenant(self, tenant: str) -> None: | |
| """Make a PUT request to the admin api to create the tenant""" | |
| path = f"/admin/v2/tenants/{tenant}" | |
| url = self._connection_str + path | |
| response = requests.put( | |
| url, json={"allowedClusters": ["standalone"], "adminRoles": []} | |
| ) # TODO: how to manage clusters? | |
| if response.status_code != 204 and response.status_code != 409: | |
| raise RuntimeError(f"Failed to create tenant {tenant}") | |
| def create_namespace(self, tenant: str, namespace: str) -> None: | |
| """Make a PUT request to the admin api to create the namespace""" | |
| path = f"/admin/v2/namespaces/{tenant}/{namespace}" | |
| url = self._connection_str + path | |
| response = requests.put(url) | |
| if response.status_code != 204 and response.status_code != 409: | |
| raise RuntimeError(f"Failed to create namespace {namespace}") | |
| def create_topic(self, topic: str) -> None: | |
| # TODO: support non-persistent topics? | |
| tenant, namespace, topic_name = parse_topic_name(topic) | |
| if tenant != "default": | |
| raise ValueError(f"Only the default tenant is supported, got {tenant}") | |
| if namespace != "default": | |
| raise ValueError( | |
| f"Only the default namespace is supported, got {namespace}" | |
| ) | |
| # Make a PUT request to the admin api to create the topic | |
| path = f"/admin/v2/persistent/{tenant}/{namespace}/{topic_name}" | |
| url = self._connection_str + path | |
| response = requests.put(url) | |
| if response.status_code != 204 and response.status_code != 409: | |
| raise RuntimeError(f"Failed to create topic {topic_name}") | |
| def delete_topic(self, topic: str) -> None: | |
| tenant, namespace, topic_name = parse_topic_name(topic) | |
| if tenant != "default": | |
| raise ValueError(f"Only the default tenant is supported, got {tenant}") | |
| if namespace != "default": | |
| raise ValueError( | |
| f"Only the default namespace is supported, got {namespace}" | |
| ) | |
| # Make a PUT request to the admin api to delete the topic | |
| path = f"/admin/v2/persistent/{tenant}/{namespace}/{topic_name}" | |
| # Force delete the topic | |
| path += "?force=true" | |
| url = self._connection_str + path | |
| response = requests.delete(url) | |
| if response.status_code != 204 and response.status_code != 409: | |
| raise RuntimeError(f"Failed to delete topic {topic_name}") | |