File size: 6,938 Bytes
197a291
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ff6e87f
197a291
 
6155057
 
197a291
 
 
 
 
 
 
3025519
197a291
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain.docstore.document import Document
import nltk
import os
import numpy as np
import textract
from collections import defaultdict
from langchain_community.vectorstores import FAISS
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from src.prompts import DEFINE_QUERY_PROMPT
from typing import Optional


class AgglomerativeClustering:
    def __init__(self, n_clusters: int = 16):
        self.n_clusters = n_clusters
        self.inf = 1e16
        self.sample_size = 0
        self._distances = None

    def _init_clusters(self, X: np.array):
        distances = self.distance(XA=X, XB=X) + np.eye(self.sample_size) * self.inf
        clusters = [[i] for i in range(self.sample_size)]
        return distances, clusters

    def _average(self, clusters, min_cluster, max_cluster):
        return (self._distances[min_cluster] * len(clusters[min_cluster]) + self._distances[max_cluster] * len(
            clusters[max_cluster])) / (len(clusters[min_cluster]) + len(clusters[max_cluster]))

    def _get_params(self, counter):
        min_distance = np.argmin(self._distances)
        param_1 = min_distance // counter
        param_2 = min_distance % counter
        return min(param_1, param_2), max(param_1, param_2)

    def _merge_clusters(self, clusters, min_cluster, max_cluster):
        self._distances[:, min_cluster] = self._distances[min_cluster, :]
        self._distances = np.delete(self._distances, max_cluster, axis=0)
        self._distances = np.delete(self._distances, max_cluster, axis=1)
        self._distances[min_cluster][min_cluster] = np.inf
        clusters[min_cluster].extend(clusters[max_cluster])
        clusters.pop(max_cluster)

    def _get_labels(self, clusters):
        result = [0] * self.sample_size
        for cluster in range(len(clusters)):
            for dote in clusters[cluster]:
                result[dote] = cluster
        return result

    def fit_predict(self, X: np.array) -> np.array:
        self.sample_size = X.shape[0]
        self._distances, clusters = self._init_clusters(X)

        while len(clusters) > self.n_clusters:
            min_cluster, max_cluster = self._get_params(len(clusters))
            if max(clusters[min_cluster]) + 1 == min(clusters[max_cluster]):
                self._distances[min_cluster] = self._average(clusters=clusters, min_cluster=min_cluster,
                                                             max_cluster=max_cluster)
                self._merge_clusters(clusters=clusters, min_cluster=min_cluster, max_cluster=max_cluster)
            else:
                self._distances[min_cluster, max_cluster] = self.inf
                self._distances[max_cluster, min_cluster] = self.inf

        return np.array(self._get_labels(clusters))

    @staticmethod
    def distance(XA, XB):
        return np.sqrt(((XA[:, np.newaxis] - XB[np.newaxis, :]) ** 2).sum(axis=2))


class CustomAgglomerativeSplitter:
    def __init__(self, emb_model: str):
        self._embeddings_model = OpenAIEmbeddings(model=emb_model)

    @staticmethod
    def read_pdfs(path: str) -> tuple[list, list]:
        files = os.listdir(path)
        pages = []
        file_names = []
        for file in files:
            page = textract.process(f"{path}/{file}", method='pdfminer').decode('utf-8').replace('\n', ' ')
            text = nltk.sent_tokenize(page)
            pages.append(text)
            file_names.append(file)
        return pages, file_names

    def get_embeddings(self, pages: list) -> list[np.array]:
        return [np.array(self._embeddings_model.embed_documents(texts)) for texts in pages]

    @staticmethod
    def split_list_by_indexes(data: list, indexes: list) -> list:
        result_dict = defaultdict(list)
        for element, index in zip(data, indexes):
            result_dict[index].append(element)
        return list(result_dict.values())

    @staticmethod
    def balance_pages(pages: list, max_tokens: int = 256) -> list:
        balanced_pages = []
        for page in pages:
            str_page = ' '.join(page)
            if len(str_page.split()) > max_tokens:
                n_of_pages = int(np.ceil(len(str_page.split()) / max_tokens))
                result = [' '.join(list(res)) for res in np.array_split(page, n_of_pages)]
                balanced_pages.extend(result)
            else:
                balanced_pages.append(' '.join(page))
        return balanced_pages

    def cluster_pages(self, pages: list, embeddings: list, file_names: list, mean_n_of_sentences: int = 5) -> list:
        documents = []
        for page_number, page in enumerate(pages):
            sentence_embeddings = embeddings[page_number]
            n_clusters = len(page) // mean_n_of_sentences
            model = AgglomerativeClustering(n_clusters=n_clusters)
            labels = model.fit_predict(sentence_embeddings)
            page_docs = self.split_list_by_indexes(page, labels)
            page_docs = self.balance_pages(page_docs)
            documents.extend([
                Document(page_content=text, metadata={"file_name": file_names[page_number]}) for text in page_docs
            ])
        return documents

    def read_and_split(self, path: str) -> list:
        pages, file_names = self.read_pdfs(path)
        embeddings = self.get_embeddings(pages)
        return self.cluster_pages(pages, embeddings, file_names)


class FaissDB:
    def __init__(self, emb_model):
        self._embeddings_model = OpenAIEmbeddings(model=emb_model)
        self.index = None

    def init_index(self, documents: list[Document]):
        self.index = FAISS.from_documents(documents, self._embeddings_model)

    def save_index(self, path: str):
        self.index.save_local(path)

    def load_index(self, path: str):
        self.index = FAISS.load_local(path, self._embeddings_model, allow_dangerous_deserialization=True)

    def similarity_search(self, query: str, k: int = 4):
        if self.index is None:
            raise ValueError("Index is not initialized")
        documents = self.index.similarity_search(query, k)
        return [doc.page_content for doc in documents]


class AICompletion:
    def __init__(self, chat_model: str = "gpt-4o", temperature: float = 0.0):
        self.human = "{text}"
        self.model = ChatOpenAI(model=chat_model, temperature=temperature)

    def get_answer(self, system_prompt: str, text: str) -> Optional[str]:
        prompt = ChatPromptTemplate.from_messages([("system", system_prompt),
                                                   ("human", self.human)])
        chain = prompt | self.model
        return chain.invoke({"text": text}).content


def define_query(query: str, chat_model: AICompletion) -> Optional[str]:
    result = chat_model.get_answer(DEFINE_QUERY_PROMPT, query)
    return result if result != "Unrelated." else None