Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import shutil | |
| import tempfile | |
| import zipfile | |
| from faiss import IndexFlatL2 | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.docstore.in_memory import InMemoryDocstore | |
| class VectorStoreManager: | |
| def __init__(self, embeddings=None): | |
| """ | |
| Initializes the VectorStoreManager with a FAISS vector store. | |
| Args: | |
| embeddings (Embeddings, optional): Embeddings model used for the vector store. | |
| """ | |
| self.vectorstore = None | |
| if embeddings: | |
| self.vectorstore = self.create_vectorstore(embeddings) | |
| def create_vectorstore(self, embeddings): | |
| """ | |
| Creates and initializes a FAISS vector store. | |
| Args: | |
| embeddings (Embeddings): Embeddings model used for the vector store. | |
| Returns: | |
| FAISS: Initialized vector store. | |
| """ | |
| # Define vector store dimensions based on embeddings | |
| dimensions = len(embeddings.embed_query("dummy")) | |
| # Initialize FAISS vector store | |
| vectorstore = FAISS( | |
| embedding_function=embeddings, | |
| index=IndexFlatL2(dimensions), | |
| docstore=InMemoryDocstore(), | |
| index_to_docstore_id={}, | |
| normalize_L2=False | |
| ) | |
| print("Created a new FAISS vector store.") | |
| return vectorstore | |
| def add_documents(self, documents): | |
| """ | |
| Adds new documents to the FAISS vector store, each document with a unique UUID. | |
| Args: | |
| documents (list): List of Document objects to be added to the vector store. | |
| Returns: | |
| list: List of UUIDs corresponding to the added documents. | |
| """ | |
| if not self.vectorstore: | |
| raise ValueError("Vector store is not initialized. Please create or load a vector store first.") | |
| uuids = [str(uuid.uuid4()) for _ in range(len(documents))] | |
| self.vectorstore.add_documents(documents=documents, ids=uuids) | |
| print(f"Added {len(documents)} documents to the vector store with IDs: {uuids}") | |
| return uuids | |
| def delete_documents(self, ids): | |
| """ | |
| Deletes documents from the FAISS vector store using their unique IDs. | |
| Args: | |
| ids (list): List of UUIDs corresponding to the documents to be deleted. | |
| Returns: | |
| bool: True if the documents were successfully deleted, False otherwise. | |
| """ | |
| if not self.vectorstore: | |
| raise ValueError("Vector store is not initialized. Please create or load a vector store first.") | |
| if not ids: | |
| print("No document IDs provided for deletion.") | |
| return False | |
| success = self.vectorstore.delete(ids=ids) | |
| if success: | |
| print(f"Successfully deleted documents with IDs: {ids}") | |
| else: | |
| print(f"Failed to delete documents with IDs: {ids}") | |
| return success | |
| def save(self, filename="faiss_index"): | |
| """ | |
| Saves the current FAISS vector store locally. If the saved store is a directory, | |
| it compresses it into a ZIP archive. | |
| Args: | |
| filename (str): The filename or directory name where the vector store will be saved. | |
| Returns: | |
| dict: A dictionary with details about the saved file including file path and media type. | |
| """ | |
| if not self.vectorstore: | |
| raise ValueError("Vector store is not initialized. Please create or load a vector store first.") | |
| # Save the vectorstore locally | |
| self.vectorstore.save_local(filename) | |
| print(f"Vector store saved to {filename}") | |
| if not os.path.exists(filename): | |
| raise FileNotFoundError("Saved vectorstore not found.") | |
| # If the saved vectorstore is a directory, compress it into a zip file. | |
| if os.path.isdir(filename): | |
| zip_filename = filename + ".zip" | |
| shutil.make_archive(filename, 'zip', filename) | |
| return { | |
| "file_path": zip_filename, | |
| "media_type": "application/zip", | |
| "serve_filename": os.path.basename(zip_filename), | |
| "original": filename, | |
| } | |
| else: | |
| return { | |
| "file_path": filename, | |
| "media_type": "application/octet-stream", | |
| "serve_filename": os.path.basename(filename), | |
| "original": filename, | |
| } | |
| def load(file_input, embeddings): | |
| """ | |
| Loads a FAISS vector store from an uploaded file or a filename. | |
| If file_input is a file-like object, it is saved to a temporary file. | |
| If it's a string (filename), it is used directly. | |
| """ | |
| # Check if file_input is a string (filename) or a file-like object. | |
| if isinstance(file_input, str): | |
| tmp_filename = file_input | |
| else: | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
| tmp.write(file_input.read()) | |
| tmp_filename = tmp.name | |
| try: | |
| if zipfile.is_zipfile(tmp_filename): | |
| with tempfile.TemporaryDirectory() as extract_dir: | |
| with zipfile.ZipFile(tmp_filename, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| extracted_items = os.listdir(extract_dir) | |
| if len(extracted_items) == 1: | |
| potential_dir = os.path.join(extract_dir, extracted_items[0]) | |
| if os.path.isdir(potential_dir): | |
| vectorstore_dir = potential_dir | |
| else: | |
| vectorstore_dir = extract_dir | |
| else: | |
| vectorstore_dir = extract_dir | |
| new_vectorstore = FAISS.load_local(vectorstore_dir, embeddings, allow_dangerous_deserialization=True) | |
| message = "Vector store loaded successfully from ZIP." | |
| else: | |
| new_vectorstore = FAISS.load_local(tmp_filename, embeddings, allow_dangerous_deserialization=True) | |
| message = "Vector store loaded successfully." | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error loading vectorstore: {str(e)}") | |
| finally: | |
| # Only remove the temp file if we created it here (i.e. file_input was not a filename) | |
| if not isinstance(file_input, str) and os.path.exists(tmp_filename): | |
| os.remove(tmp_filename) | |
| instance = VectorStoreManager() | |
| instance.vectorstore = new_vectorstore | |
| print(message) | |
| return instance, message | |
| def merge(self, file_input, embeddings): | |
| """ | |
| Merges an uploaded vector store file into the current FAISS vector store. | |
| Args: | |
| file_input (Union[file-like object, str]): An object with a .read() method or a filename (str). | |
| embeddings (Embeddings): Embeddings model used for loading the vector store. | |
| Returns: | |
| dict: A dictionary containing a message indicating successful merging. | |
| """ | |
| # Determine if file_input is a filename (str) or a file-like object. | |
| if isinstance(file_input, str): | |
| tmp_filename = file_input | |
| temp_created = False | |
| else: | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
| tmp.write(file_input.read()) | |
| tmp_filename = tmp.name | |
| temp_created = True | |
| try: | |
| # Check if the file is a ZIP archive. | |
| if zipfile.is_zipfile(tmp_filename): | |
| with tempfile.TemporaryDirectory() as extract_dir: | |
| with zipfile.ZipFile(tmp_filename, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| extracted_items = os.listdir(extract_dir) | |
| if len(extracted_items) == 1: | |
| potential_dir = os.path.join(extract_dir, extracted_items[0]) | |
| if os.path.isdir(potential_dir): | |
| vectorstore_dir = potential_dir | |
| else: | |
| vectorstore_dir = extract_dir | |
| else: | |
| vectorstore_dir = extract_dir | |
| source_store = FAISS.load_local( | |
| vectorstore_dir, embeddings, allow_dangerous_deserialization=True | |
| ) | |
| else: | |
| source_store = FAISS.load_local( | |
| tmp_filename, embeddings, allow_dangerous_deserialization=True | |
| ) | |
| if not self.vectorstore: | |
| raise ValueError("Vector store is not initialized. Please create or load a vector store first.") | |
| self.vectorstore.merge_from(source_store) | |
| print("Successfully merged the source vector store into the current vector store.") | |
| except Exception as e: | |
| raise Exception(f"Error merging vectorstore: {str(e)}") | |
| finally: | |
| if temp_created and os.path.exists(tmp_filename): | |
| os.remove(tmp_filename) | |
| return {"message": "Vector stores merged successfully"} | |