Spaces:
Runtime error
Runtime error
| use crate::{ | |
| config::{Configurable, WorkerConfig}, | |
| errors::ChromaError, | |
| }; | |
| use super::{ | |
| config::{AssignmentPolicyConfig, HasherType}, | |
| rendezvous_hash::{assign, AssignmentError, Murmur3Hasher}, | |
| }; | |
| use async_trait::async_trait; | |
| /* | |
| =========================================== | |
| Interfaces | |
| =========================================== | |
| */ | |
| /// AssignmentPolicy is a trait that defines how to assign a key to a set of members. | |
| /// # Notes | |
| /// This trait mirrors the go and python versions of the assignment policy | |
| /// interface. | |
| /// # Methods | |
| /// - assign: Assign a key to a topic. | |
| /// - get_members: Get the members that can be assigned to. | |
| /// - set_members: Set the members that can be assigned to. | |
| /// # Notes | |
| /// An assignment policy is not responsible for creating the topics it assigns to. | |
| /// It is the responsibility of the caller to ensure that the topics exist. | |
| /// An assignment policy must be Send. | |
| pub(crate) trait AssignmentPolicy: Send { | |
| fn assign(&self, key: &str) -> Result<String, AssignmentError>; | |
| fn get_members(&self) -> Vec<String>; | |
| fn set_members(&mut self, members: Vec<String>); | |
| } | |
| /* | |
| =========================================== | |
| Implementation | |
| =========================================== | |
| */ | |
| pub(crate) struct RendezvousHashingAssignmentPolicy { | |
| hasher: Murmur3Hasher, | |
| members: Vec<String>, | |
| } | |
| impl RendezvousHashingAssignmentPolicy { | |
| // Rust beginners note | |
| // The reason we take String and not &str is because we need to put the strings into our | |
| // struct, and we can't do that with references so rather than clone the strings, we just | |
| // take ownership of them and put the responsibility on the caller to clone them if they | |
| // need to. This is the general pattern we should follow in rust - put the burden of cloning | |
| // on the caller, and if they don't need to clone, they can pass ownership. | |
| pub(crate) fn new( | |
| pulsar_tenant: String, | |
| pulsar_namespace: String, | |
| ) -> RendezvousHashingAssignmentPolicy { | |
| return RendezvousHashingAssignmentPolicy { | |
| hasher: Murmur3Hasher {}, | |
| members: vec![], | |
| }; | |
| } | |
| pub(crate) fn set_members(&mut self, members: Vec<String>) { | |
| self.members = members; | |
| } | |
| } | |
| impl Configurable for RendezvousHashingAssignmentPolicy { | |
| async fn try_from_config(worker_config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> { | |
| let assignment_policy_config = match &worker_config.assignment_policy { | |
| AssignmentPolicyConfig::RendezvousHashing(config) => config, | |
| }; | |
| let hasher = match assignment_policy_config.hasher { | |
| HasherType::Murmur3 => Murmur3Hasher {}, | |
| }; | |
| return Ok(RendezvousHashingAssignmentPolicy { | |
| hasher: hasher, | |
| members: vec![], | |
| }); | |
| } | |
| } | |
| impl AssignmentPolicy for RendezvousHashingAssignmentPolicy { | |
| fn assign(&self, key: &str) -> Result<String, AssignmentError> { | |
| let topics = self.get_members(); | |
| let topic = assign(key, topics, &self.hasher); | |
| return topic; | |
| } | |
| fn get_members(&self) -> Vec<String> { | |
| // This is not designed to be used frequently for now, nor is the number of members | |
| // expected to be large, so we can just clone the members | |
| return self.members.clone(); | |
| } | |
| fn set_members(&mut self, members: Vec<String>) { | |
| self.members = members; | |
| } | |
| } | |