|
|
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store |
|
|
from langflow.helpers.data import docs_to_data |
|
|
from langflow.io import ( |
|
|
BoolInput, |
|
|
DataInput, |
|
|
DictInput, |
|
|
DropdownInput, |
|
|
FloatInput, |
|
|
HandleInput, |
|
|
IntInput, |
|
|
MultilineInput, |
|
|
SecretStrInput, |
|
|
StrInput, |
|
|
) |
|
|
from langflow.schema import Data |
|
|
|
|
|
|
|
|
class MilvusVectorStoreComponent(LCVectorStoreComponent): |
|
|
"""Milvus vector store with search capabilities.""" |
|
|
|
|
|
display_name: str = "Milvus" |
|
|
description: str = "Milvus vector store with search capabilities" |
|
|
documentation = "https://python.langchain.com/docs/integrations/vectorstores/milvus" |
|
|
name = "Milvus" |
|
|
icon = "Milvus" |
|
|
|
|
|
inputs = [ |
|
|
StrInput(name="collection_name", display_name="Collection Name", value="langflow"), |
|
|
StrInput(name="collection_description", display_name="Collection Description", value=""), |
|
|
StrInput( |
|
|
name="uri", |
|
|
display_name="Connection URI", |
|
|
value="http://localhost:19530", |
|
|
), |
|
|
SecretStrInput( |
|
|
name="password", |
|
|
display_name="Token", |
|
|
value="", |
|
|
info="Ignore this field if no token is required to make connection.", |
|
|
), |
|
|
DictInput(name="connection_args", display_name="Other Connection Arguments", advanced=True), |
|
|
StrInput(name="primary_field", display_name="Primary Field Name", value="pk"), |
|
|
StrInput(name="text_field", display_name="Text Field Name", value="text"), |
|
|
StrInput(name="vector_field", display_name="Vector Field Name", value="vector"), |
|
|
DropdownInput( |
|
|
name="consistency_level", |
|
|
display_name="Consistencey Level", |
|
|
options=["Bounded", "Session", "Strong", "Eventual"], |
|
|
value="Session", |
|
|
advanced=True, |
|
|
), |
|
|
DictInput(name="index_params", display_name="Index Parameters", advanced=True), |
|
|
DictInput(name="search_params", display_name="Search Parameters", advanced=True), |
|
|
BoolInput(name="drop_old", display_name="Drop Old Collection", value=False, advanced=True), |
|
|
FloatInput(name="timeout", display_name="Timeout", advanced=True), |
|
|
MultilineInput(name="search_query", display_name="Search Query"), |
|
|
DataInput( |
|
|
name="ingest_data", |
|
|
display_name="Ingest Data", |
|
|
is_list=True, |
|
|
), |
|
|
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), |
|
|
IntInput( |
|
|
name="number_of_results", |
|
|
display_name="Number of Results", |
|
|
info="Number of results to return.", |
|
|
value=4, |
|
|
advanced=True, |
|
|
), |
|
|
] |
|
|
|
|
|
@check_cached_vector_store |
|
|
def build_vector_store(self): |
|
|
try: |
|
|
from langchain_milvus.vectorstores import Milvus as LangchainMilvus |
|
|
except ImportError as e: |
|
|
msg = "Could not import Milvus integration package. Please install it with `pip install langchain-milvus`." |
|
|
raise ImportError(msg) from e |
|
|
self.connection_args.update(uri=self.uri, token=self.password) |
|
|
milvus_store = LangchainMilvus( |
|
|
embedding_function=self.embedding, |
|
|
collection_name=self.collection_name, |
|
|
collection_description=self.collection_description, |
|
|
connection_args=self.connection_args, |
|
|
consistency_level=self.consistency_level, |
|
|
index_params=self.index_params, |
|
|
search_params=self.search_params, |
|
|
drop_old=self.drop_old, |
|
|
auto_id=True, |
|
|
primary_field=self.primary_field, |
|
|
text_field=self.text_field, |
|
|
vector_field=self.vector_field, |
|
|
timeout=self.timeout, |
|
|
) |
|
|
|
|
|
documents = [] |
|
|
for _input in self.ingest_data or []: |
|
|
if isinstance(_input, Data): |
|
|
documents.append(_input.to_lc_document()) |
|
|
else: |
|
|
documents.append(_input) |
|
|
|
|
|
if documents: |
|
|
milvus_store.add_documents(documents) |
|
|
|
|
|
return milvus_store |
|
|
|
|
|
def search_documents(self) -> list[Data]: |
|
|
vector_store = self.build_vector_store() |
|
|
|
|
|
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip(): |
|
|
docs = vector_store.similarity_search( |
|
|
query=self.search_query, |
|
|
k=self.number_of_results, |
|
|
) |
|
|
|
|
|
data = docs_to_data(docs) |
|
|
self.status = data |
|
|
return data |
|
|
return [] |
|
|
|