Spaces:
Running
Running
| from datetime import timedelta | |
| from langchain_community.vectorstores import CouchbaseVectorStore | |
| from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store | |
| from langflow.helpers.data import docs_to_data | |
| from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput | |
| from langflow.schema import Data | |
| class CouchbaseVectorStoreComponent(LCVectorStoreComponent): | |
| display_name = "Couchbase" | |
| description = "Couchbase Vector Store with search capabilities" | |
| documentation = "https://python.langchain.com/v0.1/docs/integrations/document_loaders/couchbase/" | |
| name = "Couchbase" | |
| icon = "Couchbase" | |
| inputs = [ | |
| SecretStrInput( | |
| name="couchbase_connection_string", display_name="Couchbase Cluster connection string", required=True | |
| ), | |
| StrInput(name="couchbase_username", display_name="Couchbase username", required=True), | |
| SecretStrInput(name="couchbase_password", display_name="Couchbase password", required=True), | |
| StrInput(name="bucket_name", display_name="Bucket Name", required=True), | |
| StrInput(name="scope_name", display_name="Scope Name", required=True), | |
| StrInput(name="collection_name", display_name="Collection Name", required=True), | |
| StrInput(name="index_name", display_name="Index Name", required=True), | |
| MultilineInput(name="search_query", display_name="Search Query"), | |
| DataInput( | |
| name="ingest_data", | |
| display_name="Ingest Data", | |
| is_list=True, | |
| ), | |
| HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), | |
| IntInput( | |
| name="number_of_results", | |
| display_name="Number of Results", | |
| info="Number of results to return.", | |
| value=4, | |
| advanced=True, | |
| ), | |
| ] | |
| def build_vector_store(self) -> CouchbaseVectorStore: | |
| try: | |
| from couchbase.auth import PasswordAuthenticator | |
| from couchbase.cluster import Cluster | |
| from couchbase.options import ClusterOptions | |
| except ImportError as e: | |
| msg = "Failed to import Couchbase dependencies. Install it using `pip install langflow[couchbase] --pre`" | |
| raise ImportError(msg) from e | |
| try: | |
| auth = PasswordAuthenticator(self.couchbase_username, self.couchbase_password) | |
| options = ClusterOptions(auth) | |
| cluster = Cluster(self.couchbase_connection_string, options) | |
| cluster.wait_until_ready(timedelta(seconds=5)) | |
| except Exception as e: | |
| msg = f"Failed to connect to Couchbase: {e}" | |
| raise ValueError(msg) from e | |
| documents = [] | |
| for _input in self.ingest_data or []: | |
| if isinstance(_input, Data): | |
| documents.append(_input.to_lc_document()) | |
| else: | |
| documents.append(_input) | |
| if documents: | |
| couchbase_vs = CouchbaseVectorStore.from_documents( | |
| documents=documents, | |
| cluster=cluster, | |
| bucket_name=self.bucket_name, | |
| scope_name=self.scope_name, | |
| collection_name=self.collection_name, | |
| embedding=self.embedding, | |
| index_name=self.index_name, | |
| ) | |
| else: | |
| couchbase_vs = CouchbaseVectorStore( | |
| cluster=cluster, | |
| bucket_name=self.bucket_name, | |
| scope_name=self.scope_name, | |
| collection_name=self.collection_name, | |
| embedding=self.embedding, | |
| index_name=self.index_name, | |
| ) | |
| return couchbase_vs | |
| def search_documents(self) -> list[Data]: | |
| vector_store = self.build_vector_store() | |
| if self.search_query and isinstance(self.search_query, str) and self.search_query.strip(): | |
| docs = vector_store.similarity_search( | |
| query=self.search_query, | |
| k=self.number_of_results, | |
| ) | |
| data = docs_to_data(docs) | |
| self.status = data | |
| return data | |
| return [] | |