Spaces:
Runtime error
Runtime error
| use std::sync::Arc; | |
| use std::{fmt::Debug, sync::RwLock}; | |
| use super::config::{CustomResourceMemberlistProviderConfig, MemberlistProviderConfig}; | |
| use crate::system::{Receiver, Sender}; | |
| use crate::{ | |
| config::{Configurable, WorkerConfig}, | |
| errors::{ChromaError, ErrorCodes}, | |
| system::{Component, ComponentContext, Handler, StreamHandler}, | |
| }; | |
| use async_trait::async_trait; | |
| use futures::{StreamExt, TryStreamExt}; | |
| use k8s_openapi::api::events::v1::Event; | |
| use kube::{ | |
| api::Api, | |
| config, | |
| runtime::{watcher, watcher::Error as WatchError, WatchStreamExt}, | |
| Client, CustomResource, | |
| }; | |
| use schemars::JsonSchema; | |
| use serde::{Deserialize, Serialize}; | |
| use thiserror::Error; | |
| use tokio_util::sync::CancellationToken; | |
| /* =========== Basic Types ============== */ | |
| pub(crate) type Memberlist = Vec<String>; | |
| pub(crate) trait MemberlistProvider: Component + Configurable { | |
| fn subscribe(&mut self, receiver: Box<dyn Receiver<Memberlist> + Send>) -> (); | |
| } | |
| /* =========== CRD ============== */ | |
| pub(crate) struct MemberListCrd { | |
| pub(crate) members: Vec<Member>, | |
| } | |
| // Define the structure for items in the members array | |
| pub(crate) struct Member { | |
| pub(crate) url: String, | |
| } | |
| /* =========== CR Provider ============== */ | |
| pub(crate) struct CustomResourceMemberlistProvider { | |
| memberlist_name: String, | |
| kube_client: Client, | |
| kube_ns: String, | |
| memberlist_cr_client: Api<MemberListKubeResource>, | |
| queue_size: usize, | |
| current_memberlist: RwLock<Memberlist>, | |
| subscribers: Vec<Box<dyn Receiver<Memberlist> + Send>>, | |
| } | |
| impl Debug for CustomResourceMemberlistProvider { | |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
| f.debug_struct("CustomResourceMemberlistProvider") | |
| .field("memberlist_name", &self.memberlist_name) | |
| .field("kube_ns", &self.kube_ns) | |
| .field("queue_size", &self.queue_size) | |
| .finish() | |
| } | |
| } | |
| pub(crate) enum CustomResourceMemberlistProviderConfigurationError { | |
| FailedToLoadKubeClient( kube::Error), | |
| } | |
| impl ChromaError for CustomResourceMemberlistProviderConfigurationError { | |
| fn code(&self) -> crate::errors::ErrorCodes { | |
| match self { | |
| CustomResourceMemberlistProviderConfigurationError::FailedToLoadKubeClient(e) => { | |
| ErrorCodes::Internal | |
| } | |
| } | |
| } | |
| } | |
| impl Configurable for CustomResourceMemberlistProvider { | |
| async fn try_from_config(worker_config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> { | |
| let my_config = match &worker_config.memberlist_provider { | |
| MemberlistProviderConfig::CustomResource(config) => config, | |
| }; | |
| let kube_client = match Client::try_default().await { | |
| Ok(client) => client, | |
| Err(err) => { | |
| return Err(Box::new( | |
| CustomResourceMemberlistProviderConfigurationError::FailedToLoadKubeClient(err), | |
| )) | |
| } | |
| }; | |
| let memberlist_cr_client = Api::<MemberListKubeResource>::namespaced( | |
| kube_client.clone(), | |
| &worker_config.kube_namespace, | |
| ); | |
| let c: CustomResourceMemberlistProvider = CustomResourceMemberlistProvider { | |
| memberlist_name: my_config.memberlist_name.clone(), | |
| kube_ns: worker_config.kube_namespace.clone(), | |
| kube_client: kube_client, | |
| memberlist_cr_client: memberlist_cr_client, | |
| queue_size: my_config.queue_size, | |
| current_memberlist: RwLock::new(vec![]), | |
| subscribers: vec![], | |
| }; | |
| Ok(c) | |
| } | |
| } | |
| impl CustomResourceMemberlistProvider { | |
| fn new( | |
| memberlist_name: String, | |
| kube_client: Client, | |
| kube_ns: String, | |
| queue_size: usize, | |
| ) -> Self { | |
| let memberlist_cr_client = | |
| Api::<MemberListKubeResource>::namespaced(kube_client.clone(), &kube_ns); | |
| CustomResourceMemberlistProvider { | |
| memberlist_name: memberlist_name, | |
| kube_ns: kube_ns, | |
| kube_client: kube_client, | |
| memberlist_cr_client: memberlist_cr_client, | |
| queue_size: queue_size, | |
| current_memberlist: RwLock::new(vec![]), | |
| subscribers: vec![], | |
| } | |
| } | |
| fn connect_to_kube_stream(&self, ctx: &ComponentContext<CustomResourceMemberlistProvider>) { | |
| let memberlist_cr_client = | |
| Api::<MemberListKubeResource>::namespaced(self.kube_client.clone(), &self.kube_ns); | |
| let stream = watcher(memberlist_cr_client, watcher::Config::default()) | |
| .default_backoff() | |
| .applied_objects(); | |
| let stream = stream.then(|event| async move { | |
| match event { | |
| Ok(event) => { | |
| let event = event; | |
| println!("Kube stream event: {:?}", event); | |
| Some(event) | |
| } | |
| Err(err) => { | |
| println!("Error acquiring memberlist: {}", err); | |
| None | |
| } | |
| } | |
| }); | |
| self.register_stream(stream, ctx); | |
| } | |
| async fn notify_subscribers(&self) -> () { | |
| let curr_memberlist = match self.current_memberlist.read() { | |
| Ok(curr_memberlist) => curr_memberlist.clone(), | |
| Err(err) => { | |
| // TODO: Log error and attempt recovery | |
| return; | |
| } | |
| }; | |
| for subscriber in self.subscribers.iter() { | |
| let _ = subscriber.send(curr_memberlist.clone()).await; | |
| } | |
| } | |
| } | |
| impl Component for CustomResourceMemberlistProvider { | |
| fn queue_size(&self) -> usize { | |
| self.queue_size | |
| } | |
| fn on_start(&mut self, ctx: &ComponentContext<CustomResourceMemberlistProvider>) { | |
| self.connect_to_kube_stream(ctx); | |
| } | |
| } | |
| impl Handler<Option<MemberListKubeResource>> for CustomResourceMemberlistProvider { | |
| async fn handle( | |
| &mut self, | |
| event: Option<MemberListKubeResource>, | |
| _ctx: &ComponentContext<CustomResourceMemberlistProvider>, | |
| ) { | |
| match event { | |
| Some(memberlist) => { | |
| println!("Memberlist event in CustomResourceMemberlistProvider. Name: {:?}. Members: {:?}", memberlist.metadata.name, memberlist.spec.members); | |
| let name = match &memberlist.metadata.name { | |
| Some(name) => name, | |
| None => { | |
| // TODO: Log an error | |
| return; | |
| } | |
| }; | |
| if name != &self.memberlist_name { | |
| return; | |
| } | |
| let memberlist = memberlist.spec.members; | |
| let memberlist = memberlist | |
| .iter() | |
| .map(|member| member.url.clone()) | |
| .collect::<Vec<String>>(); | |
| { | |
| let curr_memberlist_handle = self.current_memberlist.write(); | |
| match curr_memberlist_handle { | |
| Ok(mut curr_memberlist) => { | |
| *curr_memberlist = memberlist; | |
| } | |
| Err(err) => { | |
| // TODO: Log an error | |
| } | |
| } | |
| } | |
| // Inform subscribers | |
| self.notify_subscribers().await; | |
| } | |
| None => { | |
| // Stream closed or error | |
| } | |
| } | |
| } | |
| } | |
| impl StreamHandler<Option<MemberListKubeResource>> for CustomResourceMemberlistProvider {} | |
| impl MemberlistProvider for CustomResourceMemberlistProvider { | |
| fn subscribe(&mut self, sender: Box<dyn Receiver<Memberlist> + Send>) -> () { | |
| self.subscribers.push(sender); | |
| } | |
| } | |
| mod tests { | |
| use crate::system::System; | |
| use super::*; | |
| async fn it_can_work() { | |
| // TODO: This only works if you have a kubernetes cluster running locally with a memberlist | |
| // We need to implement a test harness for this. For now, it will silently do nothing | |
| // if you don't have a kubernetes cluster running locally and only serve as a reminder | |
| // and demonstration of how to use the memberlist provider. | |
| let kube_ns = "chroma".to_string(); | |
| let kube_client = Client::try_default().await.unwrap(); | |
| let memberlist_provider = CustomResourceMemberlistProvider::new( | |
| "worker-memberlist".to_string(), | |
| kube_client.clone(), | |
| kube_ns.clone(), | |
| 10, | |
| ); | |
| let mut system = System::new(); | |
| let handle = system.start_component(memberlist_provider); | |
| } | |
| } | |