Spaces:
Sleeping
Sleeping
| from langchain_openai.embeddings import OpenAIEmbeddings | |
| from langchain.docstore.document import Document | |
| import nltk | |
| import os | |
| import numpy as np | |
| import textract | |
| from collections import defaultdict | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from src.prompts import DEFINE_QUERY_PROMPT | |
| from typing import Optional | |
| class AgglomerativeClustering: | |
| def __init__(self, n_clusters: int = 16): | |
| self.n_clusters = n_clusters | |
| self.inf = 1e16 | |
| self.sample_size = 0 | |
| self._distances = None | |
| def _init_clusters(self, X: np.array): | |
| distances = self.distance(XA=X, XB=X) + np.eye(self.sample_size) * self.inf | |
| clusters = [[i] for i in range(self.sample_size)] | |
| return distances, clusters | |
| def _average(self, clusters, min_cluster, max_cluster): | |
| return (self._distances[min_cluster] * len(clusters[min_cluster]) + self._distances[max_cluster] * len( | |
| clusters[max_cluster])) / (len(clusters[min_cluster]) + len(clusters[max_cluster])) | |
| def _get_params(self, counter): | |
| min_distance = np.argmin(self._distances) | |
| param_1 = min_distance // counter | |
| param_2 = min_distance % counter | |
| return min(param_1, param_2), max(param_1, param_2) | |
| def _merge_clusters(self, clusters, min_cluster, max_cluster): | |
| self._distances[:, min_cluster] = self._distances[min_cluster, :] | |
| self._distances = np.delete(self._distances, max_cluster, axis=0) | |
| self._distances = np.delete(self._distances, max_cluster, axis=1) | |
| self._distances[min_cluster][min_cluster] = np.inf | |
| clusters[min_cluster].extend(clusters[max_cluster]) | |
| clusters.pop(max_cluster) | |
| def _get_labels(self, clusters): | |
| result = [0] * self.sample_size | |
| for cluster in range(len(clusters)): | |
| for dote in clusters[cluster]: | |
| result[dote] = cluster | |
| return result | |
| def fit_predict(self, X: np.array) -> np.array: | |
| self.sample_size = X.shape[0] | |
| self._distances, clusters = self._init_clusters(X) | |
| while len(clusters) > self.n_clusters: | |
| min_cluster, max_cluster = self._get_params(len(clusters)) | |
| if max(clusters[min_cluster]) + 1 == min(clusters[max_cluster]): | |
| self._distances[min_cluster] = self._average(clusters=clusters, min_cluster=min_cluster, | |
| max_cluster=max_cluster) | |
| self._merge_clusters(clusters=clusters, min_cluster=min_cluster, max_cluster=max_cluster) | |
| else: | |
| self._distances[min_cluster, max_cluster] = self.inf | |
| self._distances[max_cluster, min_cluster] = self.inf | |
| return np.array(self._get_labels(clusters)) | |
| def distance(XA, XB): | |
| return np.sqrt(((XA[:, np.newaxis] - XB[np.newaxis, :]) ** 2).sum(axis=2)) | |
| class CustomAgglomerativeSplitter: | |
| def __init__(self, emb_model: str): | |
| self._embeddings_model = OpenAIEmbeddings(model=emb_model) | |
| def read_pdfs(path: str) -> tuple[list, list]: | |
| files = os.listdir(path) | |
| pages = [] | |
| file_names = [] | |
| for file in files: | |
| page = textract.process(f"{path}/{file}", method='pdfminer').decode('utf-8').replace('\n', ' ') | |
| text = nltk.sent_tokenize(page) | |
| pages.append(text) | |
| file_names.append(file) | |
| return pages, file_names | |
| def get_embeddings(self, pages: list) -> list[np.array]: | |
| return [np.array(self._embeddings_model.embed_documents(texts)) for texts in pages] | |
| def split_list_by_indexes(data: list, indexes: list) -> list: | |
| result_dict = defaultdict(list) | |
| for element, index in zip(data, indexes): | |
| result_dict[index].append(element) | |
| return list(result_dict.values()) | |
| def balance_pages(pages: list, max_tokens: int = 256) -> list: | |
| balanced_pages = [] | |
| for page in pages: | |
| str_page = ' '.join(page) | |
| if len(str_page.split()) > max_tokens: | |
| n_of_pages = int(np.ceil(len(str_page.split()) / max_tokens)) | |
| result = [' '.join(list(res)) for res in np.array_split(page, n_of_pages)] | |
| balanced_pages.extend(result) | |
| else: | |
| balanced_pages.append(' '.join(page)) | |
| return balanced_pages | |
| def cluster_pages(self, pages: list, embeddings: list, file_names: list, mean_n_of_sentences: int = 5) -> list: | |
| documents = [] | |
| for page_number, page in enumerate(pages): | |
| sentence_embeddings = embeddings[page_number] | |
| n_clusters = len(page) // mean_n_of_sentences | |
| model = AgglomerativeClustering(n_clusters=n_clusters) | |
| labels = model.fit_predict(sentence_embeddings) | |
| page_docs = self.split_list_by_indexes(page, labels) | |
| page_docs = self.balance_pages(page_docs) | |
| documents.extend([ | |
| Document(page_content=text, metadata={"file_name": file_names[page_number]}) for text in page_docs | |
| ]) | |
| return documents | |
| def read_and_split(self, path: str) -> list: | |
| pages, file_names = self.read_pdfs(path) | |
| embeddings = self.get_embeddings(pages) | |
| return self.cluster_pages(pages, embeddings, file_names) | |
| class FaissDB: | |
| def __init__(self, emb_model): | |
| self._embeddings_model = OpenAIEmbeddings(model=emb_model) | |
| self.index = None | |
| def init_index(self, documents: list[Document]): | |
| self.index = FAISS.from_documents(documents, self._embeddings_model) | |
| def save_index(self, path: str): | |
| self.index.save_local(path) | |
| def load_index(self, path: str): | |
| self.index = FAISS.load_local(path, self._embeddings_model, allow_dangerous_deserialization=True) | |
| def similarity_search(self, query: str, k: int = 4): | |
| if self.index is None: | |
| raise ValueError("Index is not initialized") | |
| documents = self.index.similarity_search(query, k) | |
| return [doc.page_content for doc in documents] | |
| class AICompletion: | |
| def __init__(self, chat_model: str = "gpt-4o", temperature: float = 0.0): | |
| self.human = "{text}" | |
| self.model = ChatOpenAI(model=chat_model, temperature=temperature) | |
| def get_answer(self, system_prompt: str, text: str) -> Optional[str]: | |
| prompt = ChatPromptTemplate.from_messages([("system", system_prompt), | |
| ("human", self.human)]) | |
| chain = prompt | self.model | |
| return chain.invoke({"text": text}).content | |
| def define_query(query: str, chat_model: AICompletion) -> Optional[str]: | |
| result = chat_model.get_answer(DEFINE_QUERY_PROMPT, query) | |
| return result if result != "Unrelated." else None | |