from langchain_community.vectorstores import PGVector from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store from langflow.helpers.data import docs_to_data from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput from langflow.schema import Data from langflow.utils.connection_string_parser import transform_connection_string class PGVectorStoreComponent(LCVectorStoreComponent): display_name = "PGVector" description = "PGVector Vector Store with search capabilities" documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/pgvector/" name = "pgvector" icon = "cpu" inputs = [ SecretStrInput(name="pg_server_url", display_name="PostgreSQL Server Connection String", required=True), StrInput(name="collection_name", display_name="Table", required=True), MultilineInput(name="search_query", display_name="Search Query"), DataInput( name="ingest_data", display_name="Ingestion Data", is_list=True, ), HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), IntInput( name="number_of_results", display_name="Number of Results", info="Number of results to return.", value=4, advanced=True, ), HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), ] @check_cached_vector_store def build_vector_store(self) -> PGVector: documents = [] for _input in self.ingest_data or []: if isinstance(_input, Data): documents.append(_input.to_lc_document()) else: documents.append(_input) connection_string_parsed = transform_connection_string(self.pg_server_url) if documents: pgvector = PGVector.from_documents( embedding=self.embedding, documents=documents, collection_name=self.collection_name, connection_string=connection_string_parsed, ) else: pgvector = PGVector.from_existing_index( embedding=self.embedding, collection_name=self.collection_name, connection_string=connection_string_parsed, ) return pgvector def search_documents(self) -> list[Data]: vector_store = self.build_vector_store() if self.search_query and isinstance(self.search_query, str) and self.search_query.strip(): docs = vector_store.similarity_search( query=self.search_query, k=self.number_of_results, ) data = docs_to_data(docs) self.status = data return data return []