|
|
from datetime import timedelta |
|
|
|
|
|
from langchain_community.vectorstores import CouchbaseVectorStore |
|
|
|
|
|
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store |
|
|
from langflow.helpers.data import docs_to_data |
|
|
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput |
|
|
from langflow.schema import Data |
|
|
|
|
|
|
|
|
class CouchbaseVectorStoreComponent(LCVectorStoreComponent): |
|
|
display_name = "Couchbase" |
|
|
description = "Couchbase Vector Store with search capabilities" |
|
|
documentation = "https://python.langchain.com/v0.1/docs/integrations/document_loaders/couchbase/" |
|
|
name = "Couchbase" |
|
|
icon = "Couchbase" |
|
|
|
|
|
inputs = [ |
|
|
SecretStrInput( |
|
|
name="couchbase_connection_string", display_name="Couchbase Cluster connection string", required=True |
|
|
), |
|
|
StrInput(name="couchbase_username", display_name="Couchbase username", required=True), |
|
|
SecretStrInput(name="couchbase_password", display_name="Couchbase password", required=True), |
|
|
StrInput(name="bucket_name", display_name="Bucket Name", required=True), |
|
|
StrInput(name="scope_name", display_name="Scope Name", required=True), |
|
|
StrInput(name="collection_name", display_name="Collection Name", required=True), |
|
|
StrInput(name="index_name", display_name="Index Name", required=True), |
|
|
MultilineInput(name="search_query", display_name="Search Query"), |
|
|
DataInput( |
|
|
name="ingest_data", |
|
|
display_name="Ingest Data", |
|
|
is_list=True, |
|
|
), |
|
|
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), |
|
|
IntInput( |
|
|
name="number_of_results", |
|
|
display_name="Number of Results", |
|
|
info="Number of results to return.", |
|
|
value=4, |
|
|
advanced=True, |
|
|
), |
|
|
] |
|
|
|
|
|
@check_cached_vector_store |
|
|
def build_vector_store(self) -> CouchbaseVectorStore: |
|
|
try: |
|
|
from couchbase.auth import PasswordAuthenticator |
|
|
from couchbase.cluster import Cluster |
|
|
from couchbase.options import ClusterOptions |
|
|
except ImportError as e: |
|
|
msg = "Failed to import Couchbase dependencies. Install it using `pip install langflow[couchbase] --pre`" |
|
|
raise ImportError(msg) from e |
|
|
|
|
|
try: |
|
|
auth = PasswordAuthenticator(self.couchbase_username, self.couchbase_password) |
|
|
options = ClusterOptions(auth) |
|
|
cluster = Cluster(self.couchbase_connection_string, options) |
|
|
|
|
|
cluster.wait_until_ready(timedelta(seconds=5)) |
|
|
except Exception as e: |
|
|
msg = f"Failed to connect to Couchbase: {e}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
documents = [] |
|
|
for _input in self.ingest_data or []: |
|
|
if isinstance(_input, Data): |
|
|
documents.append(_input.to_lc_document()) |
|
|
else: |
|
|
documents.append(_input) |
|
|
|
|
|
if documents: |
|
|
couchbase_vs = CouchbaseVectorStore.from_documents( |
|
|
documents=documents, |
|
|
cluster=cluster, |
|
|
bucket_name=self.bucket_name, |
|
|
scope_name=self.scope_name, |
|
|
collection_name=self.collection_name, |
|
|
embedding=self.embedding, |
|
|
index_name=self.index_name, |
|
|
) |
|
|
|
|
|
else: |
|
|
couchbase_vs = CouchbaseVectorStore( |
|
|
cluster=cluster, |
|
|
bucket_name=self.bucket_name, |
|
|
scope_name=self.scope_name, |
|
|
collection_name=self.collection_name, |
|
|
embedding=self.embedding, |
|
|
index_name=self.index_name, |
|
|
) |
|
|
|
|
|
return couchbase_vs |
|
|
|
|
|
def search_documents(self) -> list[Data]: |
|
|
vector_store = self.build_vector_store() |
|
|
|
|
|
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip(): |
|
|
docs = vector_store.similarity_search( |
|
|
query=self.search_query, |
|
|
k=self.number_of_results, |
|
|
) |
|
|
|
|
|
data = docs_to_data(docs) |
|
|
self.status = data |
|
|
return data |
|
|
return [] |
|
|
|