| | |
| |
|
| | from typing import Dict, List, Optional, Tuple |
| | import os |
| | import numpy as np |
| | import pandas as pd |
| | import umap |
| | from langchain_core.prompts.chat import ChatPromptTemplate |
| | from langchain_core.output_parsers import StrOutputParser |
| | from sklearn.mixture import GaussianMixture |
| | from langchain_community.chat_models import ChatOpenAI |
| | from langchain_community.vectorstores import FAISS |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from modules.vectorstore.base import VectorStoreBase |
| |
|
| | RANDOM_SEED = 42 |
| |
|
| |
|
| | class FAISS(FAISS): |
| | """To add length property to FAISS class""" |
| |
|
| | def __len__(self): |
| | return self.index.ntotal |
| |
|
| |
|
| | class RAPTORVectoreStore(VectorStoreBase): |
| | def __init__(self, config, documents=[], text_splitter=None, embedding_model=None): |
| | self.documents = documents |
| | self.config = config |
| | self.text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( |
| | chunk_size=self.config["splitter_options"]["chunk_size"], |
| | chunk_overlap=self.config["splitter_options"]["chunk_overlap"], |
| | separators=self.config["splitter_options"]["chunk_separators"], |
| | disallowed_special=(), |
| | ) |
| | self.embd = embedding_model |
| | self.model = ChatOpenAI( |
| | model="gpt-3.5-turbo", |
| | ) |
| |
|
| | def concat_documents(self, documents): |
| | d_sorted = sorted(documents, key=lambda x: x.metadata["source"]) |
| | d_reversed = list(reversed(d_sorted)) |
| | concatenated_content = "\n\n\n --- \n\n\n".join( |
| | [doc.page_content for doc in d_reversed] |
| | ) |
| | return concatenated_content |
| |
|
| | def split_documents(self, documents): |
| | concatenated_content = self.concat_documents(documents) |
| | texts_split = self.text_splitter.split_text(concatenated_content) |
| | return texts_split |
| |
|
| | def add_documents(self, documents): |
| | self.documents.extend(documents) |
| |
|
| | def global_cluster_embeddings( |
| | self, |
| | embeddings: np.ndarray, |
| | dim: int, |
| | n_neighbors: Optional[int] = None, |
| | metric: str = "cosine", |
| | ) -> np.ndarray: |
| | """ |
| | Perform global dimensionality reduction on the embeddings using UMAP. |
| | |
| | Parameters: |
| | - embeddings: The input embeddings as a numpy array. |
| | - dim: The target dimensionality for the reduced space. |
| | - n_neighbors: Optional; the number of neighbors to consider for each point. |
| | If not provided, it defaults to the square root of the number of embeddings. |
| | - metric: The distance metric to use for UMAP. |
| | |
| | Returns: |
| | - A numpy array of the embeddings reduced to the specified dimensionality. |
| | """ |
| | if n_neighbors is None: |
| | n_neighbors = int((len(embeddings) - 1) ** 0.5) |
| | return umap.UMAP( |
| | n_neighbors=n_neighbors, n_components=dim, metric=metric |
| | ).fit_transform(embeddings) |
| |
|
| | def local_cluster_embeddings( |
| | self, |
| | embeddings: np.ndarray, |
| | dim: int, |
| | num_neighbors: int = 10, |
| | metric: str = "cosine", |
| | ) -> np.ndarray: |
| | """ |
| | Perform local dimensionality reduction on the embeddings using UMAP, typically after global clustering. |
| | |
| | Parameters: |
| | - embeddings: The input embeddings as a numpy array. |
| | - dim: The target dimensionality for the reduced space. |
| | - num_neighbors: The number of neighbors to consider for each point. |
| | - metric: The distance metric to use for UMAP. |
| | |
| | Returns: |
| | - A numpy array of the embeddings reduced to the specified dimensionality. |
| | """ |
| | return umap.UMAP( |
| | n_neighbors=num_neighbors, n_components=dim, metric=metric |
| | ).fit_transform(embeddings) |
| |
|
| | def get_optimal_clusters( |
| | self, |
| | embeddings: np.ndarray, |
| | max_clusters: int = 50, |
| | random_state: int = RANDOM_SEED, |
| | ) -> int: |
| | """ |
| | Determine the optimal number of clusters using the Bayesian Information Criterion (BIC) with a Gaussian Mixture Model. |
| | |
| | Parameters: |
| | - embeddings: The input embeddings as a numpy array. |
| | - max_clusters: The maximum number of clusters to consider. |
| | - random_state: Seed for reproducibility. |
| | |
| | Returns: |
| | - An integer representing the optimal number of clusters found. |
| | """ |
| | max_clusters = min(max_clusters, len(embeddings)) |
| | n_clusters = np.arange(1, max_clusters) |
| | bics = [] |
| | for n in n_clusters: |
| | gm = GaussianMixture(n_components=n, random_state=random_state) |
| | gm.fit(embeddings) |
| | bics.append(gm.bic(embeddings)) |
| | return n_clusters[np.argmin(bics)] |
| |
|
| | def GMM_cluster( |
| | self, embeddings: np.ndarray, threshold: float, random_state: int = 0 |
| | ): |
| | """ |
| | Cluster embeddings using a Gaussian Mixture Model (GMM) based on a probability threshold. |
| | |
| | Parameters: |
| | - embeddings: The input embeddings as a numpy array. |
| | - threshold: The probability threshold for assigning an embedding to a cluster. |
| | - random_state: Seed for reproducibility. |
| | |
| | Returns: |
| | - A tuple containing the cluster labels and the number of clusters determined. |
| | """ |
| | n_clusters = self.get_optimal_clusters(embeddings) |
| | gm = GaussianMixture(n_components=n_clusters, random_state=random_state) |
| | gm.fit(embeddings) |
| | probs = gm.predict_proba(embeddings) |
| | labels = [np.where(prob > threshold)[0] for prob in probs] |
| | return labels, n_clusters |
| |
|
| | def perform_clustering( |
| | self, |
| | embeddings: np.ndarray, |
| | dim: int, |
| | threshold: float, |
| | ) -> List[np.ndarray]: |
| | """ |
| | Perform clustering on the embeddings by first reducing their dimensionality globally, then clustering |
| | using a Gaussian Mixture Model, and finally performing local clustering within each global cluster. |
| | |
| | Parameters: |
| | - embeddings: The input embeddings as a numpy array. |
| | - dim: The target dimensionality for UMAP reduction. |
| | - threshold: The probability threshold for assigning an embedding to a cluster in GMM. |
| | |
| | Returns: |
| | - A list of numpy arrays, where each array contains the cluster IDs for each embedding. |
| | """ |
| | if len(embeddings) <= dim + 1: |
| | |
| | return [np.array([0]) for _ in range(len(embeddings))] |
| |
|
| | |
| | reduced_embeddings_global = self.global_cluster_embeddings(embeddings, dim) |
| | |
| | global_clusters, n_global_clusters = self.GMM_cluster( |
| | reduced_embeddings_global, threshold |
| | ) |
| |
|
| | all_local_clusters = [np.array([]) for _ in range(len(embeddings))] |
| | total_clusters = 0 |
| |
|
| | |
| | for i in range(n_global_clusters): |
| | |
| | global_cluster_embeddings_ = embeddings[ |
| | np.array([i in gc for gc in global_clusters]) |
| | ] |
| |
|
| | if len(global_cluster_embeddings_) == 0: |
| | continue |
| | if len(global_cluster_embeddings_) <= dim + 1: |
| | |
| | local_clusters = [np.array([0]) for _ in global_cluster_embeddings_] |
| | n_local_clusters = 1 |
| | else: |
| | |
| | reduced_embeddings_local = self.local_cluster_embeddings( |
| | global_cluster_embeddings_, dim |
| | ) |
| | local_clusters, n_local_clusters = self.GMM_cluster( |
| | reduced_embeddings_local, threshold |
| | ) |
| |
|
| | |
| | for j in range(n_local_clusters): |
| | local_cluster_embeddings_ = global_cluster_embeddings_[ |
| | np.array([j in lc for lc in local_clusters]) |
| | ] |
| | indices = np.where( |
| | (embeddings == local_cluster_embeddings_[:, None]).all(-1) |
| | )[1] |
| | for idx in indices: |
| | all_local_clusters[idx] = np.append( |
| | all_local_clusters[idx], j + total_clusters |
| | ) |
| |
|
| | total_clusters += n_local_clusters |
| |
|
| | return all_local_clusters |
| |
|
| | def embed(self, texts): |
| | """ |
| | Generate embeddings for a list of text documents. |
| | |
| | This function assumes the existence of an `embd` object with a method `embed_documents` |
| | that takes a list of texts and returns their embeddings. |
| | |
| | Parameters: |
| | - texts: List[str], a list of text documents to be embedded. |
| | |
| | Returns: |
| | - numpy.ndarray: An array of embeddings for the given text documents. |
| | """ |
| | text_embeddings = self.embd.embed_documents(texts) |
| | text_embeddings_np = np.array(text_embeddings) |
| | return text_embeddings_np |
| |
|
| | def embed_cluster_texts(self, texts): |
| | """ |
| | Embeds a list of texts and clusters them, returning a DataFrame with texts, their embeddings, and cluster labels. |
| | |
| | This function combines embedding generation and clustering into a single step. It assumes the existence |
| | of a previously defined `perform_clustering` function that performs clustering on the embeddings. |
| | |
| | Parameters: |
| | - texts: List[str], a list of text documents to be processed. |
| | |
| | Returns: |
| | - pandas.DataFrame: A DataFrame containing the original texts, their embeddings, and the assigned cluster labels. |
| | """ |
| | text_embeddings_np = self.embed(texts) |
| | cluster_labels = self.perform_clustering( |
| | text_embeddings_np, 10, 0.1 |
| | ) |
| | df = pd.DataFrame() |
| | df["text"] = texts |
| | df["embd"] = list( |
| | text_embeddings_np |
| | ) |
| | df["cluster"] = cluster_labels |
| | return df |
| |
|
| | def fmt_txt(self, df: pd.DataFrame) -> str: |
| | """ |
| | Formats the text documents in a DataFrame into a single string. |
| | |
| | Parameters: |
| | - df: DataFrame containing the 'text' column with text documents to format. |
| | |
| | Returns: |
| | - A single string where all text documents are joined by a specific delimiter. |
| | """ |
| | unique_txt = df["text"].tolist() |
| | return "--- --- \n --- --- ".join(unique_txt) |
| |
|
| | def embed_cluster_summarize_texts( |
| | self, texts: List[str], level: int |
| | ) -> Tuple[pd.DataFrame, pd.DataFrame]: |
| | """ |
| | Embeds, clusters, and summarizes a list of texts. This function first generates embeddings for the texts, |
| | clusters them based on similarity, expands the cluster assignments for easier processing, and then summarizes |
| | the content within each cluster. |
| | |
| | Parameters: |
| | - texts: A list of text documents to be processed. |
| | - level: An integer parameter that could define the depth or detail of processing. |
| | |
| | Returns: |
| | - Tuple containing two DataFrames: |
| | 1. The first DataFrame (`df_clusters`) includes the original texts, their embeddings, and cluster assignments. |
| | 2. The second DataFrame (`df_summary`) contains summaries for each cluster, the specified level of detail, |
| | and the cluster identifiers. |
| | """ |
| |
|
| | |
| | df_clusters = self.embed_cluster_texts(texts) |
| |
|
| | |
| | expanded_list = [] |
| |
|
| | |
| | for index, row in df_clusters.iterrows(): |
| | for cluster in row["cluster"]: |
| | expanded_list.append( |
| | {"text": row["text"], "embd": row["embd"], "cluster": cluster} |
| | ) |
| |
|
| | |
| | expanded_df = pd.DataFrame(expanded_list) |
| |
|
| | |
| | all_clusters = expanded_df["cluster"].unique() |
| |
|
| | print(f"--Generated {len(all_clusters)} clusters--") |
| |
|
| | |
| | template = """Here is content from the course DS598: Deep Learning for Data Science. |
| | The content may be form webapge about the course, or lecture content, or any other relevant information. |
| | If the content is in bullet points (from pdf lectre slides), you can summarize the bullet points. |
| | Give a detailed summary of the content below. |
| | Documentation: |
| | {context} |
| | """ |
| | prompt = ChatPromptTemplate.from_template(template) |
| | chain = prompt | self.model | StrOutputParser() |
| |
|
| | |
| | summaries = [] |
| | for i in all_clusters: |
| | df_cluster = expanded_df[expanded_df["cluster"] == i] |
| | formatted_txt = self.fmt_txt(df_cluster) |
| | summaries.append(chain.invoke({"context": formatted_txt})) |
| |
|
| | |
| | df_summary = pd.DataFrame( |
| | { |
| | "summaries": summaries, |
| | "level": [level] * len(summaries), |
| | "cluster": list(all_clusters), |
| | } |
| | ) |
| |
|
| | return df_clusters, df_summary |
| |
|
| | def recursive_embed_cluster_summarize( |
| | self, texts: List[str], level: int = 1, n_levels: int = 3 |
| | ) -> Dict[int, Tuple[pd.DataFrame, pd.DataFrame]]: |
| | """ |
| | Recursively embeds, clusters, and summarizes texts up to a specified level or until |
| | the number of unique clusters becomes 1, storing the results at each level. |
| | |
| | Parameters: |
| | - texts: List[str], texts to be processed. |
| | - level: int, current recursion level (starts at 1). |
| | - n_levels: int, maximum depth of recursion. |
| | |
| | Returns: |
| | - Dict[int, Tuple[pd.DataFrame, pd.DataFrame]], a dictionary where keys are the recursion |
| | levels and values are tuples containing the clusters DataFrame and summaries DataFrame at that level. |
| | """ |
| | results = {} |
| |
|
| | |
| | df_clusters, df_summary = self.embed_cluster_summarize_texts(texts, level) |
| |
|
| | |
| | results[level] = (df_clusters, df_summary) |
| |
|
| | |
| | unique_clusters = df_summary["cluster"].nunique() |
| | if level < n_levels and unique_clusters > 1: |
| | |
| | new_texts = df_summary["summaries"].tolist() |
| | next_level_results = self.recursive_embed_cluster_summarize( |
| | new_texts, level + 1, n_levels |
| | ) |
| |
|
| | |
| | results.update(next_level_results) |
| |
|
| | return results |
| |
|
| | def get_vector_db(self): |
| | """ |
| | Generate a retriever object from a list of documents. |
| | |
| | Parameters: |
| | - documents: List of document objects. |
| | |
| | Returns: |
| | - A retriever object. |
| | """ |
| | leaf_texts = self.split_documents(self.documents) |
| | results = self.recursive_embed_cluster_summarize( |
| | leaf_texts, level=1, n_levels=10 |
| | ) |
| |
|
| | all_texts = leaf_texts.copy() |
| | |
| | for level in sorted(results.keys()): |
| | |
| | summaries = results[level][1]["summaries"].tolist() |
| | |
| | all_texts.extend(summaries) |
| |
|
| | |
| | vectorstore = FAISS.from_texts(texts=all_texts, embedding=self.embd) |
| | return vectorstore |
| |
|
| | def create_database(self, documents, embedding_model): |
| | self.documents = documents |
| | self.embd = embedding_model |
| | self.vectorstore = self.get_vector_db() |
| | self.vectorstore.save_local( |
| | os.path.join( |
| | self.config["vectorstore"]["db_path"], |
| | "db_" |
| | + self.config["vectorstore"]["db_option"] |
| | + "_" |
| | + self.config["vectorstore"]["model"], |
| | ) |
| | ) |
| |
|
| | def load_database(self, embedding_model): |
| | self.vectorstore = FAISS.load_local( |
| | os.path.join( |
| | self.config["vectorstore"]["db_path"], |
| | "db_" |
| | + self.config["vectorstore"]["db_option"] |
| | + "_" |
| | + self.config["vectorstore"]["model"], |
| | ), |
| | embedding_model, |
| | allow_dangerous_deserialization=True, |
| | ) |
| | return self.vectorstore |
| |
|
| | def as_retriever(self): |
| | return self.vectorstore.as_retriever() |
| |
|