Spaces:
Sleeping
Sleeping
File size: 6,938 Bytes
197a291 ff6e87f 197a291 6155057 197a291 3025519 197a291 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | from langchain_openai.embeddings import OpenAIEmbeddings
from langchain.docstore.document import Document
import nltk
import os
import numpy as np
import textract
from collections import defaultdict
from langchain_community.vectorstores import FAISS
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from src.prompts import DEFINE_QUERY_PROMPT
from typing import Optional
class AgglomerativeClustering:
def __init__(self, n_clusters: int = 16):
self.n_clusters = n_clusters
self.inf = 1e16
self.sample_size = 0
self._distances = None
def _init_clusters(self, X: np.array):
distances = self.distance(XA=X, XB=X) + np.eye(self.sample_size) * self.inf
clusters = [[i] for i in range(self.sample_size)]
return distances, clusters
def _average(self, clusters, min_cluster, max_cluster):
return (self._distances[min_cluster] * len(clusters[min_cluster]) + self._distances[max_cluster] * len(
clusters[max_cluster])) / (len(clusters[min_cluster]) + len(clusters[max_cluster]))
def _get_params(self, counter):
min_distance = np.argmin(self._distances)
param_1 = min_distance // counter
param_2 = min_distance % counter
return min(param_1, param_2), max(param_1, param_2)
def _merge_clusters(self, clusters, min_cluster, max_cluster):
self._distances[:, min_cluster] = self._distances[min_cluster, :]
self._distances = np.delete(self._distances, max_cluster, axis=0)
self._distances = np.delete(self._distances, max_cluster, axis=1)
self._distances[min_cluster][min_cluster] = np.inf
clusters[min_cluster].extend(clusters[max_cluster])
clusters.pop(max_cluster)
def _get_labels(self, clusters):
result = [0] * self.sample_size
for cluster in range(len(clusters)):
for dote in clusters[cluster]:
result[dote] = cluster
return result
def fit_predict(self, X: np.array) -> np.array:
self.sample_size = X.shape[0]
self._distances, clusters = self._init_clusters(X)
while len(clusters) > self.n_clusters:
min_cluster, max_cluster = self._get_params(len(clusters))
if max(clusters[min_cluster]) + 1 == min(clusters[max_cluster]):
self._distances[min_cluster] = self._average(clusters=clusters, min_cluster=min_cluster,
max_cluster=max_cluster)
self._merge_clusters(clusters=clusters, min_cluster=min_cluster, max_cluster=max_cluster)
else:
self._distances[min_cluster, max_cluster] = self.inf
self._distances[max_cluster, min_cluster] = self.inf
return np.array(self._get_labels(clusters))
@staticmethod
def distance(XA, XB):
return np.sqrt(((XA[:, np.newaxis] - XB[np.newaxis, :]) ** 2).sum(axis=2))
class CustomAgglomerativeSplitter:
def __init__(self, emb_model: str):
self._embeddings_model = OpenAIEmbeddings(model=emb_model)
@staticmethod
def read_pdfs(path: str) -> tuple[list, list]:
files = os.listdir(path)
pages = []
file_names = []
for file in files:
page = textract.process(f"{path}/{file}", method='pdfminer').decode('utf-8').replace('\n', ' ')
text = nltk.sent_tokenize(page)
pages.append(text)
file_names.append(file)
return pages, file_names
def get_embeddings(self, pages: list) -> list[np.array]:
return [np.array(self._embeddings_model.embed_documents(texts)) for texts in pages]
@staticmethod
def split_list_by_indexes(data: list, indexes: list) -> list:
result_dict = defaultdict(list)
for element, index in zip(data, indexes):
result_dict[index].append(element)
return list(result_dict.values())
@staticmethod
def balance_pages(pages: list, max_tokens: int = 256) -> list:
balanced_pages = []
for page in pages:
str_page = ' '.join(page)
if len(str_page.split()) > max_tokens:
n_of_pages = int(np.ceil(len(str_page.split()) / max_tokens))
result = [' '.join(list(res)) for res in np.array_split(page, n_of_pages)]
balanced_pages.extend(result)
else:
balanced_pages.append(' '.join(page))
return balanced_pages
def cluster_pages(self, pages: list, embeddings: list, file_names: list, mean_n_of_sentences: int = 5) -> list:
documents = []
for page_number, page in enumerate(pages):
sentence_embeddings = embeddings[page_number]
n_clusters = len(page) // mean_n_of_sentences
model = AgglomerativeClustering(n_clusters=n_clusters)
labels = model.fit_predict(sentence_embeddings)
page_docs = self.split_list_by_indexes(page, labels)
page_docs = self.balance_pages(page_docs)
documents.extend([
Document(page_content=text, metadata={"file_name": file_names[page_number]}) for text in page_docs
])
return documents
def read_and_split(self, path: str) -> list:
pages, file_names = self.read_pdfs(path)
embeddings = self.get_embeddings(pages)
return self.cluster_pages(pages, embeddings, file_names)
class FaissDB:
def __init__(self, emb_model):
self._embeddings_model = OpenAIEmbeddings(model=emb_model)
self.index = None
def init_index(self, documents: list[Document]):
self.index = FAISS.from_documents(documents, self._embeddings_model)
def save_index(self, path: str):
self.index.save_local(path)
def load_index(self, path: str):
self.index = FAISS.load_local(path, self._embeddings_model, allow_dangerous_deserialization=True)
def similarity_search(self, query: str, k: int = 4):
if self.index is None:
raise ValueError("Index is not initialized")
documents = self.index.similarity_search(query, k)
return [doc.page_content for doc in documents]
class AICompletion:
def __init__(self, chat_model: str = "gpt-4o", temperature: float = 0.0):
self.human = "{text}"
self.model = ChatOpenAI(model=chat_model, temperature=temperature)
def get_answer(self, system_prompt: str, text: str) -> Optional[str]:
prompt = ChatPromptTemplate.from_messages([("system", system_prompt),
("human", self.human)])
chain = prompt | self.model
return chain.invoke({"text": text}).content
def define_query(query: str, chat_model: AICompletion) -> Optional[str]:
result = chat_model.get_answer(DEFINE_QUERY_PROMPT, query)
return result if result != "Unrelated." else None
|