from langchain_openai.embeddings import OpenAIEmbeddings from langchain.docstore.document import Document import nltk import os import numpy as np import textract from collections import defaultdict from langchain_community.vectorstores import FAISS from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from src.prompts import DEFINE_QUERY_PROMPT from typing import Optional class AgglomerativeClustering: def __init__(self, n_clusters: int = 16): self.n_clusters = n_clusters self.inf = 1e16 self.sample_size = 0 self._distances = None def _init_clusters(self, X: np.array): distances = self.distance(XA=X, XB=X) + np.eye(self.sample_size) * self.inf clusters = [[i] for i in range(self.sample_size)] return distances, clusters def _average(self, clusters, min_cluster, max_cluster): return (self._distances[min_cluster] * len(clusters[min_cluster]) + self._distances[max_cluster] * len( clusters[max_cluster])) / (len(clusters[min_cluster]) + len(clusters[max_cluster])) def _get_params(self, counter): min_distance = np.argmin(self._distances) param_1 = min_distance // counter param_2 = min_distance % counter return min(param_1, param_2), max(param_1, param_2) def _merge_clusters(self, clusters, min_cluster, max_cluster): self._distances[:, min_cluster] = self._distances[min_cluster, :] self._distances = np.delete(self._distances, max_cluster, axis=0) self._distances = np.delete(self._distances, max_cluster, axis=1) self._distances[min_cluster][min_cluster] = np.inf clusters[min_cluster].extend(clusters[max_cluster]) clusters.pop(max_cluster) def _get_labels(self, clusters): result = [0] * self.sample_size for cluster in range(len(clusters)): for dote in clusters[cluster]: result[dote] = cluster return result def fit_predict(self, X: np.array) -> np.array: self.sample_size = X.shape[0] self._distances, clusters = self._init_clusters(X) while len(clusters) > self.n_clusters: min_cluster, max_cluster = self._get_params(len(clusters)) if max(clusters[min_cluster]) + 1 == min(clusters[max_cluster]): self._distances[min_cluster] = self._average(clusters=clusters, min_cluster=min_cluster, max_cluster=max_cluster) self._merge_clusters(clusters=clusters, min_cluster=min_cluster, max_cluster=max_cluster) else: self._distances[min_cluster, max_cluster] = self.inf self._distances[max_cluster, min_cluster] = self.inf return np.array(self._get_labels(clusters)) @staticmethod def distance(XA, XB): return np.sqrt(((XA[:, np.newaxis] - XB[np.newaxis, :]) ** 2).sum(axis=2)) class CustomAgglomerativeSplitter: def __init__(self, emb_model: str): self._embeddings_model = OpenAIEmbeddings(model=emb_model) @staticmethod def read_pdfs(path: str) -> tuple[list, list]: files = os.listdir(path) pages = [] file_names = [] for file in files: page = textract.process(f"{path}/{file}", method='pdfminer').decode('utf-8').replace('\n', ' ') text = nltk.sent_tokenize(page) pages.append(text) file_names.append(file) return pages, file_names def get_embeddings(self, pages: list) -> list[np.array]: return [np.array(self._embeddings_model.embed_documents(texts)) for texts in pages] @staticmethod def split_list_by_indexes(data: list, indexes: list) -> list: result_dict = defaultdict(list) for element, index in zip(data, indexes): result_dict[index].append(element) return list(result_dict.values()) @staticmethod def balance_pages(pages: list, max_tokens: int = 256) -> list: balanced_pages = [] for page in pages: str_page = ' '.join(page) if len(str_page.split()) > max_tokens: n_of_pages = int(np.ceil(len(str_page.split()) / max_tokens)) result = [' '.join(list(res)) for res in np.array_split(page, n_of_pages)] balanced_pages.extend(result) else: balanced_pages.append(' '.join(page)) return balanced_pages def cluster_pages(self, pages: list, embeddings: list, file_names: list, mean_n_of_sentences: int = 5) -> list: documents = [] for page_number, page in enumerate(pages): sentence_embeddings = embeddings[page_number] n_clusters = len(page) // mean_n_of_sentences model = AgglomerativeClustering(n_clusters=n_clusters) labels = model.fit_predict(sentence_embeddings) page_docs = self.split_list_by_indexes(page, labels) page_docs = self.balance_pages(page_docs) documents.extend([ Document(page_content=text, metadata={"file_name": file_names[page_number]}) for text in page_docs ]) return documents def read_and_split(self, path: str) -> list: pages, file_names = self.read_pdfs(path) embeddings = self.get_embeddings(pages) return self.cluster_pages(pages, embeddings, file_names) class FaissDB: def __init__(self, emb_model): self._embeddings_model = OpenAIEmbeddings(model=emb_model) self.index = None def init_index(self, documents: list[Document]): self.index = FAISS.from_documents(documents, self._embeddings_model) def save_index(self, path: str): self.index.save_local(path) def load_index(self, path: str): self.index = FAISS.load_local(path, self._embeddings_model, allow_dangerous_deserialization=True) def similarity_search(self, query: str, k: int = 4): if self.index is None: raise ValueError("Index is not initialized") documents = self.index.similarity_search(query, k) return [doc.page_content for doc in documents] class AICompletion: def __init__(self, chat_model: str = "gpt-4o", temperature: float = 0.0): self.human = "{text}" self.model = ChatOpenAI(model=chat_model, temperature=temperature) def get_answer(self, system_prompt: str, text: str) -> Optional[str]: prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", self.human)]) chain = prompt | self.model return chain.invoke({"text": text}).content def define_query(query: str, chat_model: AICompletion) -> Optional[str]: result = chat_model.get_answer(DEFINE_QUERY_PROMPT, query) return result if result != "Unrelated." else None