|
|
from pathlib import Path |
|
|
|
|
|
from langchain.text_splitter import CharacterTextSplitter |
|
|
from langchain_community.vectorstores.redis import Redis |
|
|
|
|
|
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store |
|
|
from langflow.helpers.data import docs_to_data |
|
|
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput |
|
|
from langflow.schema import Data |
|
|
|
|
|
|
|
|
class RedisVectorStoreComponent(LCVectorStoreComponent): |
|
|
"""A custom component for implementing a Vector Store using Redis.""" |
|
|
|
|
|
display_name: str = "Redis" |
|
|
description: str = "Implementation of Vector Store using Redis" |
|
|
documentation = "https://python.langchain.com/docs/integrations/vectorstores/redis" |
|
|
name = "Redis" |
|
|
icon = "Redis" |
|
|
|
|
|
inputs = [ |
|
|
SecretStrInput(name="redis_server_url", display_name="Redis Server Connection String", required=True), |
|
|
StrInput( |
|
|
name="redis_index_name", |
|
|
display_name="Redis Index", |
|
|
), |
|
|
StrInput(name="code", display_name="Code", advanced=True), |
|
|
StrInput( |
|
|
name="schema", |
|
|
display_name="Schema", |
|
|
), |
|
|
MultilineInput(name="search_query", display_name="Search Query"), |
|
|
DataInput( |
|
|
name="ingest_data", |
|
|
display_name="Ingest Data", |
|
|
is_list=True, |
|
|
), |
|
|
IntInput( |
|
|
name="number_of_results", |
|
|
display_name="Number of Results", |
|
|
info="Number of results to return.", |
|
|
value=4, |
|
|
advanced=True, |
|
|
), |
|
|
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), |
|
|
] |
|
|
|
|
|
@check_cached_vector_store |
|
|
def build_vector_store(self) -> Redis: |
|
|
documents = [] |
|
|
|
|
|
for _input in self.ingest_data or []: |
|
|
if isinstance(_input, Data): |
|
|
documents.append(_input.to_lc_document()) |
|
|
else: |
|
|
documents.append(_input) |
|
|
Path("docuemnts.txt").write_text(str(documents), encoding="utf-8") |
|
|
|
|
|
if not documents: |
|
|
if self.schema is None: |
|
|
msg = "If no documents are provided, a schema must be provided." |
|
|
raise ValueError(msg) |
|
|
redis_vs = Redis.from_existing_index( |
|
|
embedding=self.embedding, |
|
|
index_name=self.redis_index_name, |
|
|
schema=self.schema, |
|
|
key_prefix=None, |
|
|
redis_url=self.redis_server_url, |
|
|
) |
|
|
else: |
|
|
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) |
|
|
docs = text_splitter.split_documents(documents) |
|
|
redis_vs = Redis.from_documents( |
|
|
documents=docs, |
|
|
embedding=self.embedding, |
|
|
redis_url=self.redis_server_url, |
|
|
index_name=self.redis_index_name, |
|
|
) |
|
|
return redis_vs |
|
|
|
|
|
def search_documents(self) -> list[Data]: |
|
|
vector_store = self.build_vector_store() |
|
|
|
|
|
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip(): |
|
|
docs = vector_store.similarity_search( |
|
|
query=self.search_query, |
|
|
k=self.number_of_results, |
|
|
) |
|
|
|
|
|
data = docs_to_data(docs) |
|
|
self.status = data |
|
|
return data |
|
|
return [] |
|
|
|