Spaces:
Runtime error
Runtime error
| use async_trait::async_trait; | |
| use figment::providers::{Env, Format, Serialized, Yaml}; | |
| use serde::Deserialize; | |
| use crate::errors::ChromaError; | |
| const DEFAULT_CONFIG_PATH: &str = "./chroma_config.yaml"; | |
| const ENV_PREFIX: &str = "CHROMA_"; | |
| /// # Description | |
| /// The RootConfig for all chroma services this is a YAML file that | |
| /// is shared between all services, and secondarily, fields can be | |
| /// populated from environment variables. The environment variables | |
| /// are prefixed with CHROMA_ and are uppercase. Values in the envionment | |
| /// variables take precedence over values in the YAML file. | |
| /// By default, it is read from the current working directory, | |
| /// with the filename chroma_config.yaml. | |
| pub(crate) struct RootConfig { | |
| // The root config object wraps the worker config object so that | |
| // we can share the same config file between multiple services. | |
| pub worker: WorkerConfig, | |
| } | |
| impl RootConfig { | |
| /// # Description | |
| /// Load the config from the default location. | |
| /// # Returns | |
| /// The config object. | |
| /// # Panics | |
| /// - If the config file cannot be read. | |
| /// - If the config file is not valid YAML. | |
| /// - If the config file does not contain the required fields. | |
| /// - If the config file contains invalid values. | |
| /// - If the environment variables contain invalid values. | |
| /// # Notes | |
| /// The default location is the current working directory, with the filename chroma_config.yaml. | |
| /// The environment variables are prefixed with CHROMA_ and are uppercase. | |
| /// Values in the envionment variables take precedence over values in the YAML file. | |
| pub(crate) fn load() -> Self { | |
| return Self::load_from_path(DEFAULT_CONFIG_PATH); | |
| } | |
| /// # Description | |
| /// Load the config from a specific location. | |
| /// # Arguments | |
| /// - path: The path to the config file. | |
| /// # Returns | |
| /// The config object. | |
| /// # Panics | |
| /// - If the config file cannot be read. | |
| /// - If the config file is not valid YAML. | |
| /// - If the config file does not contain the required fields. | |
| /// - If the config file contains invalid values. | |
| /// - If the environment variables contain invalid values. | |
| /// # Notes | |
| /// The environment variables are prefixed with CHROMA_ and are uppercase. | |
| /// Values in the envionment variables take precedence over values in the YAML file. | |
| pub(crate) fn load_from_path(path: &str) -> Self { | |
| // Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them. | |
| // Excluding our own environment variables, which are prefixed with CHROMA_. | |
| let mut f = figment::Figment::from(Env::prefixed("CHROMA_").map(|k| match k { | |
| k if k == "num_indexing_threads" => k.into(), | |
| k if k == "my_ip" => k.into(), | |
| k => k.as_str().replace("__", ".").into(), | |
| })); | |
| if std::path::Path::new(path).exists() { | |
| f = figment::Figment::from(Yaml::file(path)).merge(f); | |
| } | |
| // Apply defaults - this seems to be the best way to do it. | |
| // https://github.com/SergioBenitez/Figment/issues/77#issuecomment-1642490298 | |
| f = f.join(Serialized::default( | |
| "worker.num_indexing_threads", | |
| num_cpus::get(), | |
| )); | |
| let res = f.extract(); | |
| match res { | |
| Ok(config) => return config, | |
| Err(e) => panic!("Error loading config: {}", e), | |
| } | |
| } | |
| } | |
| /// # Description | |
| /// The primary config for the worker service. | |
| /// ## Description of parameters | |
| /// - my_ip: The IP address of the worker service. Used for memberlist assignment. Must be provided | |
| /// - num_indexing_threads: The number of indexing threads to use. If not provided, defaults to the number of cores on the machine. | |
| /// - pulsar_tenant: The pulsar tenant to use. Must be provided. | |
| /// - pulsar_namespace: The pulsar namespace to use. Must be provided. | |
| /// - assignment_policy: The assignment policy to use. Must be provided. | |
| /// # Notes | |
| /// In order to set the enviroment variables, you must prefix them with CHROMA_WORKER__<FIELD_NAME>. | |
| /// For example, to set my_ip, you would set CHROMA_WORKER__MY_IP. | |
| /// Each submodule that needs to be configured from the config object should implement the Configurable trait and | |
| /// have its own field in this struct for its Config struct. | |
| pub(crate) struct WorkerConfig { | |
| pub(crate) my_ip: String, | |
| pub(crate) my_port: u16, | |
| pub(crate) num_indexing_threads: u32, | |
| pub(crate) pulsar_tenant: String, | |
| pub(crate) pulsar_namespace: String, | |
| pub(crate) pulsar_url: String, | |
| pub(crate) kube_namespace: String, | |
| pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, | |
| pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig, | |
| pub(crate) ingest: crate::ingest::config::IngestConfig, | |
| pub(crate) sysdb: crate::sysdb::config::SysDbConfig, | |
| pub(crate) segment_manager: crate::segment::config::SegmentManagerConfig, | |
| pub(crate) storage: crate::storage::config::StorageConfig, | |
| } | |
| /// # Description | |
| /// A trait for configuring a struct from a config object. | |
| /// # Notes | |
| /// This trait is used to configure structs from the config object. | |
| /// Components that need to be configured from the config object should implement this trait. | |
| pub(crate) trait Configurable { | |
| async fn try_from_config(worker_config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> | |
| where | |
| Self: Sized; | |
| } | |
| mod tests { | |
| use super::*; | |
| use figment::Jail; | |
| fn test_config_from_default_path() { | |
| Jail::expect_with(|jail| { | |
| let _ = jail.create_file( | |
| "chroma_config.yaml", | |
| r#" | |
| worker: | |
| my_ip: "192.0.0.1" | |
| my_port: 50051 | |
| num_indexing_threads: 4 | |
| pulsar_tenant: "public" | |
| pulsar_namespace: "default" | |
| pulsar_url: "pulsar://localhost:6650" | |
| kube_namespace: "chroma" | |
| assignment_policy: | |
| RendezvousHashing: | |
| hasher: Murmur3 | |
| memberlist_provider: | |
| CustomResource: | |
| memberlist_name: "worker-memberlist" | |
| queue_size: 100 | |
| ingest: | |
| queue_size: 100 | |
| sysdb: | |
| Grpc: | |
| host: "localhost" | |
| port: 50051 | |
| segment_manager: | |
| storage_path: "/tmp" | |
| storage: | |
| S3: | |
| bucket: "chroma" | |
| "#, | |
| ); | |
| let config = RootConfig::load(); | |
| assert_eq!(config.worker.my_ip, "192.0.0.1"); | |
| assert_eq!(config.worker.num_indexing_threads, 4); | |
| assert_eq!(config.worker.pulsar_tenant, "public"); | |
| assert_eq!(config.worker.pulsar_namespace, "default"); | |
| assert_eq!(config.worker.kube_namespace, "chroma"); | |
| Ok(()) | |
| }); | |
| } | |
| fn test_config_from_specific_path() { | |
| Jail::expect_with(|jail| { | |
| let _ = jail.create_file( | |
| "random_path.yaml", | |
| r#" | |
| worker: | |
| my_ip: "192.0.0.1" | |
| my_port: 50051 | |
| num_indexing_threads: 4 | |
| pulsar_tenant: "public" | |
| pulsar_namespace: "default" | |
| pulsar_url: "pulsar://localhost:6650" | |
| kube_namespace: "chroma" | |
| assignment_policy: | |
| RendezvousHashing: | |
| hasher: Murmur3 | |
| memberlist_provider: | |
| CustomResource: | |
| memberlist_name: "worker-memberlist" | |
| queue_size: 100 | |
| ingest: | |
| queue_size: 100 | |
| sysdb: | |
| Grpc: | |
| host: "localhost" | |
| port: 50051 | |
| segment_manager: | |
| storage_path: "/tmp" | |
| storage: | |
| S3: | |
| bucket: "chroma" | |
| "#, | |
| ); | |
| let config = RootConfig::load_from_path("random_path.yaml"); | |
| assert_eq!(config.worker.my_ip, "192.0.0.1"); | |
| assert_eq!(config.worker.num_indexing_threads, 4); | |
| assert_eq!(config.worker.pulsar_tenant, "public"); | |
| assert_eq!(config.worker.pulsar_namespace, "default"); | |
| assert_eq!(config.worker.kube_namespace, "chroma"); | |
| Ok(()) | |
| }); | |
| } | |
| fn test_config_missing_required_field() { | |
| Jail::expect_with(|jail| { | |
| let _ = jail.create_file( | |
| "chroma_config.yaml", | |
| r#" | |
| worker: | |
| num_indexing_threads: 4 | |
| "#, | |
| ); | |
| let _ = RootConfig::load(); | |
| Ok(()) | |
| }); | |
| } | |
| fn test_missing_default_field() { | |
| Jail::expect_with(|jail| { | |
| let _ = jail.create_file( | |
| "chroma_config.yaml", | |
| r#" | |
| worker: | |
| my_ip: "192.0.0.1" | |
| my_port: 50051 | |
| pulsar_tenant: "public" | |
| pulsar_namespace: "default" | |
| kube_namespace: "chroma" | |
| pulsar_url: "pulsar://localhost:6650" | |
| assignment_policy: | |
| RendezvousHashing: | |
| hasher: Murmur3 | |
| memberlist_provider: | |
| CustomResource: | |
| memberlist_name: "worker-memberlist" | |
| queue_size: 100 | |
| ingest: | |
| queue_size: 100 | |
| sysdb: | |
| Grpc: | |
| host: "localhost" | |
| port: 50051 | |
| segment_manager: | |
| storage_path: "/tmp" | |
| storage: | |
| S3: | |
| bucket: "chroma" | |
| "#, | |
| ); | |
| let config = RootConfig::load(); | |
| assert_eq!(config.worker.my_ip, "192.0.0.1"); | |
| assert_eq!(config.worker.num_indexing_threads, num_cpus::get() as u32); | |
| Ok(()) | |
| }); | |
| } | |
| fn test_config_with_env_override() { | |
| Jail::expect_with(|jail| { | |
| let _ = jail.set_env("CHROMA_WORKER__MY_IP", "192.0.0.1"); | |
| let _ = jail.set_env("CHROMA_WORKER__MY_PORT", 50051); | |
| let _ = jail.set_env("CHROMA_WORKER__PULSAR_TENANT", "A"); | |
| let _ = jail.set_env("CHROMA_WORKER__PULSAR_NAMESPACE", "B"); | |
| let _ = jail.set_env("CHROMA_WORKER__KUBE_NAMESPACE", "C"); | |
| let _ = jail.set_env("CHROMA_WORKER__PULSAR_URL", "pulsar://localhost:6650"); | |
| let _ = jail.create_file( | |
| "chroma_config.yaml", | |
| r#" | |
| worker: | |
| assignment_policy: | |
| RendezvousHashing: | |
| hasher: Murmur3 | |
| memberlist_provider: | |
| CustomResource: | |
| memberlist_name: "worker-memberlist" | |
| queue_size: 100 | |
| ingest: | |
| queue_size: 100 | |
| sysdb: | |
| Grpc: | |
| host: "localhost" | |
| port: 50051 | |
| segment_manager: | |
| storage_path: "/tmp" | |
| storage: | |
| S3: | |
| bucket: "chroma" | |
| "#, | |
| ); | |
| let config = RootConfig::load(); | |
| assert_eq!(config.worker.my_ip, "192.0.0.1"); | |
| assert_eq!(config.worker.my_port, 50051); | |
| assert_eq!(config.worker.num_indexing_threads, num_cpus::get() as u32); | |
| assert_eq!(config.worker.pulsar_tenant, "A"); | |
| assert_eq!(config.worker.pulsar_namespace, "B"); | |
| assert_eq!(config.worker.kube_namespace, "C"); | |
| Ok(()) | |
| }); | |
| } | |
| } | |