Spaces:
Running
Running
| import json | |
| from typing import Any | |
| from langchain_community.vectorstores import OpenSearchVectorSearch | |
| from loguru import logger | |
| from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store | |
| from langflow.io import ( | |
| BoolInput, | |
| DataInput, | |
| DropdownInput, | |
| FloatInput, | |
| HandleInput, | |
| IntInput, | |
| MultilineInput, | |
| SecretStrInput, | |
| StrInput, | |
| ) | |
| from langflow.schema import Data | |
| class OpenSearchVectorStoreComponent(LCVectorStoreComponent): | |
| """OpenSearch Vector Store with advanced, customizable search capabilities.""" | |
| display_name: str = "OpenSearch" | |
| description: str = "OpenSearch Vector Store with advanced, customizable search capabilities." | |
| documentation = "https://python.langchain.com/docs/integrations/vectorstores/opensearch" | |
| name = "OpenSearch" | |
| icon = "OpenSearch" | |
| inputs = [ | |
| StrInput( | |
| name="opensearch_url", | |
| display_name="OpenSearch URL", | |
| value="http://localhost:9200", | |
| info="URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).", | |
| ), | |
| StrInput( | |
| name="index_name", | |
| display_name="Index Name", | |
| value="langflow", | |
| info="The index name where the vectors will be stored in OpenSearch cluster.", | |
| ), | |
| MultilineInput( | |
| name="search_input", | |
| display_name="Search Input", | |
| info=( | |
| "Enter a search query. Leave empty to retrieve all documents. " | |
| "If you need a more advanced search consider using Hybrid Search Query instead." | |
| ), | |
| value="", | |
| ), | |
| DataInput( | |
| name="ingest_data", | |
| display_name="Ingest Data", | |
| is_list=True, | |
| ), | |
| HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), | |
| DropdownInput( | |
| name="search_type", | |
| display_name="Search Type", | |
| options=["similarity", "similarity_score_threshold", "mmr"], | |
| value="similarity", | |
| advanced=True, | |
| ), | |
| IntInput( | |
| name="number_of_results", | |
| display_name="Number of Results", | |
| info="Number of results to return.", | |
| advanced=True, | |
| value=4, | |
| ), | |
| FloatInput( | |
| name="search_score_threshold", | |
| display_name="Search Score Threshold", | |
| info="Minimum similarity score threshold for search results.", | |
| value=0.0, | |
| advanced=True, | |
| ), | |
| StrInput( | |
| name="username", | |
| display_name="Username", | |
| value="admin", | |
| advanced=True, | |
| ), | |
| SecretStrInput( | |
| name="password", | |
| display_name="Password", | |
| value="admin", | |
| advanced=True, | |
| ), | |
| BoolInput( | |
| name="use_ssl", | |
| display_name="Use SSL", | |
| value=True, | |
| advanced=True, | |
| ), | |
| BoolInput( | |
| name="verify_certs", | |
| display_name="Verify Certificates", | |
| value=False, | |
| advanced=True, | |
| ), | |
| MultilineInput( | |
| name="hybrid_search_query", | |
| display_name="Hybrid Search Query", | |
| value="", | |
| advanced=True, | |
| info=( | |
| "Provide a custom hybrid search query in JSON format. This allows you to combine " | |
| "vector similarity and keyword matching." | |
| ), | |
| ), | |
| ] | |
| def build_vector_store(self) -> OpenSearchVectorSearch: | |
| """Builds the OpenSearch Vector Store object.""" | |
| try: | |
| from langchain_community.vectorstores import OpenSearchVectorSearch | |
| except ImportError as e: | |
| error_message = f"Failed to import required modules: {e}" | |
| logger.exception(error_message) | |
| raise ImportError(error_message) from e | |
| try: | |
| opensearch = OpenSearchVectorSearch( | |
| index_name=self.index_name, | |
| embedding_function=self.embedding, | |
| opensearch_url=self.opensearch_url, | |
| http_auth=(self.username, self.password), | |
| use_ssl=self.use_ssl, | |
| verify_certs=self.verify_certs, | |
| ssl_assert_hostname=False, | |
| ssl_show_warn=False, | |
| ) | |
| except Exception as e: | |
| error_message = f"Failed to create OpenSearchVectorSearch instance: {e}" | |
| logger.exception(error_message) | |
| raise RuntimeError(error_message) from e | |
| if self.ingest_data: | |
| self._add_documents_to_vector_store(opensearch) | |
| return opensearch | |
| def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None: | |
| """Adds documents to the Vector Store.""" | |
| documents = [] | |
| for _input in self.ingest_data or []: | |
| if isinstance(_input, Data): | |
| documents.append(_input.to_lc_document()) | |
| else: | |
| error_message = f"Expected Data object, got {type(_input)}" | |
| logger.error(error_message) | |
| raise TypeError(error_message) | |
| if documents and self.embedding is not None: | |
| logger.debug(f"Adding {len(documents)} documents to the Vector Store.") | |
| try: | |
| vector_store.add_documents(documents) | |
| except Exception as e: | |
| error_message = f"Error adding documents to Vector Store: {e}" | |
| logger.exception(error_message) | |
| raise RuntimeError(error_message) from e | |
| else: | |
| logger.debug("No documents to add to the Vector Store.") | |
| def search(self, query: str | None = None) -> list[dict[str, Any]]: | |
| """Search for similar documents in the vector store or retrieve all documents if no query is provided.""" | |
| try: | |
| vector_store = self.build_vector_store() | |
| query = query or "" | |
| if self.hybrid_search_query.strip(): | |
| try: | |
| hybrid_query = json.loads(self.hybrid_search_query) | |
| except json.JSONDecodeError as e: | |
| error_message = f"Invalid hybrid search query JSON: {e}" | |
| logger.exception(error_message) | |
| raise ValueError(error_message) from e | |
| results = vector_store.client.search(index=self.index_name, body=hybrid_query) | |
| processed_results = [] | |
| for hit in results.get("hits", {}).get("hits", []): | |
| source = hit.get("_source", {}) | |
| text = source.get("text", "") | |
| metadata = source.get("metadata", {}) | |
| if isinstance(text, dict): | |
| text = text.get("text", "") | |
| processed_results.append( | |
| { | |
| "page_content": text, | |
| "metadata": metadata, | |
| } | |
| ) | |
| return processed_results | |
| search_kwargs = {"k": self.number_of_results} | |
| search_type = self.search_type.lower() | |
| if search_type == "similarity": | |
| results = vector_store.similarity_search(query, **search_kwargs) | |
| return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results] | |
| if search_type == "similarity_score_threshold": | |
| search_kwargs["score_threshold"] = self.search_score_threshold | |
| results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs) | |
| return [ | |
| { | |
| "page_content": doc.page_content, | |
| "metadata": doc.metadata, | |
| "score": score, | |
| } | |
| for doc, score in results | |
| ] | |
| if search_type == "mmr": | |
| results = vector_store.max_marginal_relevance_search(query, **search_kwargs) | |
| return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results] | |
| except Exception as e: | |
| error_message = f"Error during search: {e}" | |
| logger.exception(error_message) | |
| raise RuntimeError(error_message) from e | |
| error_message = f"Error during search. Invalid search type: {self.search_type}" | |
| logger.error(error_message) | |
| raise ValueError(error_message) | |
| def search_documents(self) -> list[Data]: | |
| """Search for documents in the vector store based on the search input. | |
| If no search input is provided, retrieve all documents. | |
| """ | |
| try: | |
| query = self.search_input.strip() if self.search_input else None | |
| results = self.search(query) | |
| retrieved_data = [ | |
| Data( | |
| file_path=result["metadata"].get("file_path", ""), | |
| text=result["page_content"], | |
| ) | |
| for result in results | |
| ] | |
| except Exception as e: | |
| error_message = f"Error during document search: {e}" | |
| logger.exception(error_message) | |
| raise RuntimeError(error_message) from e | |
| self.status = retrieved_data | |
| return retrieved_data | |