alexandraroze's picture
fixed optinal
3025519
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain.docstore.document import Document
import nltk
import os
import numpy as np
import textract
from collections import defaultdict
from langchain_community.vectorstores import FAISS
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from src.prompts import DEFINE_QUERY_PROMPT
from typing import Optional
class AgglomerativeClustering:
def __init__(self, n_clusters: int = 16):
self.n_clusters = n_clusters
self.inf = 1e16
self.sample_size = 0
self._distances = None
def _init_clusters(self, X: np.array):
distances = self.distance(XA=X, XB=X) + np.eye(self.sample_size) * self.inf
clusters = [[i] for i in range(self.sample_size)]
return distances, clusters
def _average(self, clusters, min_cluster, max_cluster):
return (self._distances[min_cluster] * len(clusters[min_cluster]) + self._distances[max_cluster] * len(
clusters[max_cluster])) / (len(clusters[min_cluster]) + len(clusters[max_cluster]))
def _get_params(self, counter):
min_distance = np.argmin(self._distances)
param_1 = min_distance // counter
param_2 = min_distance % counter
return min(param_1, param_2), max(param_1, param_2)
def _merge_clusters(self, clusters, min_cluster, max_cluster):
self._distances[:, min_cluster] = self._distances[min_cluster, :]
self._distances = np.delete(self._distances, max_cluster, axis=0)
self._distances = np.delete(self._distances, max_cluster, axis=1)
self._distances[min_cluster][min_cluster] = np.inf
clusters[min_cluster].extend(clusters[max_cluster])
clusters.pop(max_cluster)
def _get_labels(self, clusters):
result = [0] * self.sample_size
for cluster in range(len(clusters)):
for dote in clusters[cluster]:
result[dote] = cluster
return result
def fit_predict(self, X: np.array) -> np.array:
self.sample_size = X.shape[0]
self._distances, clusters = self._init_clusters(X)
while len(clusters) > self.n_clusters:
min_cluster, max_cluster = self._get_params(len(clusters))
if max(clusters[min_cluster]) + 1 == min(clusters[max_cluster]):
self._distances[min_cluster] = self._average(clusters=clusters, min_cluster=min_cluster,
max_cluster=max_cluster)
self._merge_clusters(clusters=clusters, min_cluster=min_cluster, max_cluster=max_cluster)
else:
self._distances[min_cluster, max_cluster] = self.inf
self._distances[max_cluster, min_cluster] = self.inf
return np.array(self._get_labels(clusters))
@staticmethod
def distance(XA, XB):
return np.sqrt(((XA[:, np.newaxis] - XB[np.newaxis, :]) ** 2).sum(axis=2))
class CustomAgglomerativeSplitter:
def __init__(self, emb_model: str):
self._embeddings_model = OpenAIEmbeddings(model=emb_model)
@staticmethod
def read_pdfs(path: str) -> tuple[list, list]:
files = os.listdir(path)
pages = []
file_names = []
for file in files:
page = textract.process(f"{path}/{file}", method='pdfminer').decode('utf-8').replace('\n', ' ')
text = nltk.sent_tokenize(page)
pages.append(text)
file_names.append(file)
return pages, file_names
def get_embeddings(self, pages: list) -> list[np.array]:
return [np.array(self._embeddings_model.embed_documents(texts)) for texts in pages]
@staticmethod
def split_list_by_indexes(data: list, indexes: list) -> list:
result_dict = defaultdict(list)
for element, index in zip(data, indexes):
result_dict[index].append(element)
return list(result_dict.values())
@staticmethod
def balance_pages(pages: list, max_tokens: int = 256) -> list:
balanced_pages = []
for page in pages:
str_page = ' '.join(page)
if len(str_page.split()) > max_tokens:
n_of_pages = int(np.ceil(len(str_page.split()) / max_tokens))
result = [' '.join(list(res)) for res in np.array_split(page, n_of_pages)]
balanced_pages.extend(result)
else:
balanced_pages.append(' '.join(page))
return balanced_pages
def cluster_pages(self, pages: list, embeddings: list, file_names: list, mean_n_of_sentences: int = 5) -> list:
documents = []
for page_number, page in enumerate(pages):
sentence_embeddings = embeddings[page_number]
n_clusters = len(page) // mean_n_of_sentences
model = AgglomerativeClustering(n_clusters=n_clusters)
labels = model.fit_predict(sentence_embeddings)
page_docs = self.split_list_by_indexes(page, labels)
page_docs = self.balance_pages(page_docs)
documents.extend([
Document(page_content=text, metadata={"file_name": file_names[page_number]}) for text in page_docs
])
return documents
def read_and_split(self, path: str) -> list:
pages, file_names = self.read_pdfs(path)
embeddings = self.get_embeddings(pages)
return self.cluster_pages(pages, embeddings, file_names)
class FaissDB:
def __init__(self, emb_model):
self._embeddings_model = OpenAIEmbeddings(model=emb_model)
self.index = None
def init_index(self, documents: list[Document]):
self.index = FAISS.from_documents(documents, self._embeddings_model)
def save_index(self, path: str):
self.index.save_local(path)
def load_index(self, path: str):
self.index = FAISS.load_local(path, self._embeddings_model, allow_dangerous_deserialization=True)
def similarity_search(self, query: str, k: int = 4):
if self.index is None:
raise ValueError("Index is not initialized")
documents = self.index.similarity_search(query, k)
return [doc.page_content for doc in documents]
class AICompletion:
def __init__(self, chat_model: str = "gpt-4o", temperature: float = 0.0):
self.human = "{text}"
self.model = ChatOpenAI(model=chat_model, temperature=temperature)
def get_answer(self, system_prompt: str, text: str) -> Optional[str]:
prompt = ChatPromptTemplate.from_messages([("system", system_prompt),
("human", self.human)])
chain = prompt | self.model
return chain.invoke({"text": text}).content
def define_query(query: str, chat_model: AICompletion) -> Optional[str]:
result = chat_model.get_answer(DEFINE_QUERY_PROMPT, query)
return result if result != "Unrelated." else None