import os import shutil import json import logging from openai import AuthenticationError import chromadb from llama_index.core import Document from llama_index.vector_stores.chroma import ChromaVectorStore from llama_index.core.node_parser import TokenTextSplitter from llama_index.core.ingestion import IngestionPipeline from llama_index.core import SimpleKeywordTableIndex from llama_index.core.storage import StorageContext from llama_index.core import load_index_from_storage from llama_index.core import VectorStoreIndex from knowledgeBase.text_extraction_webpages import scrape_articles, scrape_pdfs from utils import format_collection_name class CollectionManager: def __init__(self, scraped_data_path='Data/output-processed-sources', vector_index_save_path='Data/query-engines/collections', keyword_index_save_path='Data/query-engines/keyword-index/', query_engines_info_json='Data/query-engines/query_engines_list.json'): self.scraped_data_path = scraped_data_path self.vector_index_save_path = vector_index_save_path self.keyword_index_save_path = keyword_index_save_path self.query_engines_info_json = query_engines_info_json def create_new_collection(self, user_models, path_json_file, type_json): """ Creates a new collection by processing the input JSON file and generating vector and keyword indices. Args: user_models (UserModels): The user models used for creating the collection. path_json_file (str): The path to the input JSON file containing the data. type_json (str): The type of JSON file, either 'Webpages' or 'PDFs'. Raises: ValueError: If the type_json is not 'Webpages' or 'PDFs'. FileNotFoundError: If the output file is not found. ValueError: If the output file contains invalid JSON format. Returns: None """ file_name = os.path.basename(path_json_file) dot_location = file_name.find('.') file_name_no_exten = file_name[0:dot_location] file_name_no_exten = format_collection_name(name=file_name_no_exten) # Extract text content of each entities in input json file output_file = None if type_json == 'Webpages': try: output_file = scrape_articles( json_file=path_json_file, output_file=os.path.join(self.scraped_data_path, file_name) ) except Exception as e: logging.error("An error occured: {}".format(e)) output_file = None elif type_json == 'PDFs': try: output_file = scrape_pdfs( json_file=path_json_file, output_file=os.path.join(self.scraped_data_path, file_name) ) except Exception as e: logging.error("An error occured: {}".format(e)) output_file = None else: raise ValueError('Selected Type of JSON file is incorrect.') try: with open(output_file, "r") as file: data = json.load(file) except FileNotFoundError: raise FileNotFoundError("The file was not found: {}.",format(output_file)) # Raising error here except json.JSONDecodeError: raise ValueError("Invalid JSON format: {}.".format(output_file)) # Convert text to Document object documents = [] for entity_i in data['data']: documents.append(Document( text=entity_i['Content'], metadata={'Link': entity_i['Link'], 'Name': entity_i['Name']}, excluded_llm_metadata_keys=[ "Name", "Link", ], excluded_embed_metadata_keys=[ "Link" ], ) ) # Create vector index nodes = self.__create_vector_index( user_models=user_models, documents=documents, collection_name=file_name_no_exten ) # Create keyword index self.__create_keyword_index( nodes=nodes, collection_name=file_name_no_exten, model_llm=user_models.model_llm ) # Save the details of the created vector store self.__save_query_engine_info( user_models=user_models, collection_name=file_name_no_exten, collection_description=data['description'] ) def __create_vector_index(self, user_models, documents, collection_name): """ Creates a vector index for the given documents using the specified user models and collection name. Args: user_models (object): An object containing user-defined models for embedding. documents (list): A list of documents to be indexed. collection_name (str): The name of the collection to be created in the vector database. Returns: list: A list of nodes resulting from the transformation pipeline. Raises: ValueError: If an authentication error occurs or any other unexpected error is encountered. """ # Path to save collection collection_path = os.path.join(self.vector_index_save_path, collection_name) #Vector based database to store docs, their embeddings, ... logging.info("> Creating {} Vector Index ...".format(collection_name)) chroma_client = chromadb.PersistentClient(path=collection_path) chroma_collection = chroma_client.create_collection(name=collection_name) # Define a storage context object using the created vector database. vector_store = ChromaVectorStore(chroma_collection=chroma_collection) token_spliter = TokenTextSplitter(chunk_size=800, chunk_overlap=0, separator=" ") # Create the pipeline to apply the transformation on each document, # and store the transformed nodes in the vector store. pipeline = IngestionPipeline( transformations=[ token_spliter, # Split documents to chunks user_models.model_embd, # Convert to embedding vector ], vector_store=vector_store ) # Run the transformation pipeline. try: nodes = pipeline.run(documents=documents, show_progress=True) except AuthenticationError: raise ValueError("Authentication error: Incorrect API key provided.") except Exception as e: raise ValueError(f"An unexpected error occurred: {e}") return nodes def __create_keyword_index(self, nodes, collection_name, model_llm): """ Creates a keyword index for the given nodes and collection name. This method initializes a SimpleKeywordTableIndex with the provided nodes and LLM model, logs the creation process, and persists the index to a specified directory. Args: nodes (list): A list of nodes to be indexed. collection_name (str): The name of the collection for which the keyword index is being created. model_llm (object): The language model to be used for creating the keyword index. Returns: None """ logging.info("> Creating {} Keyword Index ...".format(collection_name)) # Initialize the SimpleKeywordTableIndex with the service context keyword_index = SimpleKeywordTableIndex(nodes=nodes, llm=model_llm, show_progress=True) # Define the directory path os.makedirs(self.keyword_index_save_path, exist_ok=True) # Persist the index with a specific ID persist_directory = os.path.join(self.keyword_index_save_path, collection_name) keyword_index.storage_context.persist(persist_directory) def __save_query_engine_info(self, user_models, collection_name, collection_description): """ Saves information about the query engine to a JSON file. This method adds details of the created vector store to a list of vector stores stored in a JSON file. If the JSON file does not exist, it creates an empty list and then appends the new entry. Args: user_models: An object containing user model information, specifically the embedding name. collection_name (str): The name of the collection to be saved. collection_description (str): A description of the collection to be saved. Raises: IOError: If there is an error reading or writing to the JSON file. """ # Add detail of created vector store to list of vector stores if not os.path.exists(self.query_engines_info_json): with open(self.query_engines_info_json, 'w') as file: json.dump([], file) vec_store_desc=[] with open(self.query_engines_info_json, 'r') as file: vec_store_desc = json.load(file) new_entry = { "name": collection_name, "description": collection_description, "embedding_name": user_models.embedding_name } vec_store_desc.append(new_entry) with open(self.query_engines_info_json, 'w') as file: json.dump(vec_store_desc, file) def delete_query_engine_by_name(self, name): """ Deletes a query engine by its name. This method performs the following actions: 1. Deletes the vector store associated with the query engine. 2. Deletes the keyword index directory associated with the query engine. 3. Updates the list of query engines by removing the entry with the specified name. Args: name (str): The name of the query engine to be deleted. Raises: FileNotFoundError: If the query engines info JSON file does not exist. json.JSONDecodeError: If the query engines info JSON file contains invalid JSON. """ # Path to save collection collection_path = os.path.join(self.vector_index_save_path, name) # Delete the vector store if os.path.exists(collection_path): shutil.rmtree(collection_path) print("The folder has been deleted successfully!") else: print("The folder does not exist.") # Delete the keyword index directory_path = self.keyword_index_save_path persist_directory = os.path.join(directory_path, name) os.system("rm -rf {}".format(persist_directory)) # Update the list of query engines with open(self.query_engines_info_json, 'r') as file: vec_store_desc = json.load(file) vec_store_desc = [i for i in vec_store_desc if i['name'] != name] with open(self.query_engines_info_json, 'w') as file: json.dump(vec_store_desc, file) def load_vector_index_from_file(self, query_engine_name, model_embd): """ Load a vector index from a file based on the query engine name and embedding model. Args: query_engine_name (str): The name of the query engine to load. model_embd: The embedding model to use for the vector store index. Returns: VectorStoreIndex: The loaded vector store index if the query engine is found, otherwise None. """ qe_details = self.get_query_engines_detail() loc = -1 for idx, qe_i in enumerate(qe_details): if qe_i['name'] == query_engine_name: loc = idx break if loc == -1: return None # Path to save collection collection_path = os.path.join(self.vector_index_save_path, query_engine_name) # Load query engine from database chroma_client = chromadb.PersistentClient(path=collection_path) chroma_collection = chroma_client.get_collection(name=query_engine_name) vector_store = ChromaVectorStore(chroma_collection=chroma_collection) vector_store_index = VectorStoreIndex.from_vector_store(vector_store, embed_model=model_embd) return vector_store_index def load_keyword_index_from_file(self, query_engine_name, model_llm): """ Load the keyword index from a file. This method rebuilds the storage context using the specified query engine name and loads the keyword index from the storage using the provided LLM model. Args: query_engine_name (str): The name of the query engine. model_llm (Any): The language model to be used for loading the index. Returns: keyword_index: The loaded keyword index. """ # Rebuild the storage context storage_context = StorageContext.from_defaults( persist_dir=os.path.join(self.keyword_index_save_path, query_engine_name) ) keyword_index = load_index_from_storage(storage_context=storage_context, index_id=None, llm=model_llm) return keyword_index def get_query_engines_detail(self): """ Retrieves the details of query engines from a JSON file. This method checks if the JSON file specified by `self.query_engines_info_json` exists. If the file does not exist, it returns an empty list. If the file exists, it reads the contents of the file and returns it as a list. Returns: list: A list containing the details of query engines. If the file does not exist, an empty list is returned. """ if not os.path.exists(self.query_engines_info_json): return [] vec_store_desc=[] with open(self.query_engines_info_json, 'r') as file: vec_store_desc = json.load(file) return vec_store_desc def get_query_engines_detail_by_name(self, query_engine_names): """ Retrieves detailed information about specific query engines by their names. Args: query_engine_names (list of str): A list of query engine names to filter the details. Returns: list of dict: A list of dictionaries containing the details of the query engines that match the provided names. """ vec_store_desc = self.get_query_engines_detail() filtered_vec_store_desc = [] for qe_i in vec_store_desc: if qe_i['name'] in query_engine_names: filtered_vec_store_desc.append(qe_i) return filtered_vec_store_desc def get_query_engines_name(self): """ Retrieve the names of query engines. This method fetches the details of query engines and extracts their names. Returns: list: A list of names of the query engines. """ vec_store_desc=self.get_query_engines_detail() return [vs_i['name'] for vs_i in vec_store_desc]