Spaces:
Sleeping
Sleeping
| from langchain_community.document_loaders import PyPDFLoader, UnstructuredWordDocumentLoader, TextLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| from app.models import Embedder | |
| from app.chunks import Chunk | |
| import nltk # used for proper tokenizer workflow | |
| from uuid import uuid4 # for generating unique id as hex (uuid4 is used as it generates ids form pseudo random numbers unlike uuid1 and others) | |
| import numpy as np | |
| from app.settings import logging, text_splitter_config, embedder_model | |
| # TODO: replace PDFloader since it is completely unusable OR try to fix it | |
| class DocumentProcessor: | |
| ''' | |
| TODO: determine the most suitable chunk size | |
| chunks -> the list of chunks from loaded files | |
| chunks_unsaved -> the list of recently added chunks that have not been saved to db yet | |
| processed -> the list of files that were already splitted into chunks | |
| upprocessed -> !processed | |
| text_splitter -> text splitting strategy | |
| ''' | |
| def __init__(self): | |
| self.chunks: list[Chunk] = [] | |
| self.chunks_unsaved: list[Chunk] = [] | |
| self.processed: list[Document] = [] | |
| self.unprocessed: list[Document] = [] | |
| self.embedder = Embedder(embedder_model) | |
| self.text_splitter = RecursiveCharacterTextSplitter(**text_splitter_config) | |
| ''' | |
| Measures cosine between two vectors | |
| ''' | |
| def cosine_similarity(self, vec1, vec2): | |
| return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) | |
| ''' | |
| Updates a list of the most relevant chunks without interacting with db | |
| ''' | |
| def update_most_relevant_chunk(self, chunk: list[np.float64, Chunk], relevant_chunks: list[list[np.float64, Chunk]], | |
| mx_len=15): | |
| relevant_chunks.append(chunk) | |
| for i in range(len(relevant_chunks) - 1, 0, -1): | |
| if relevant_chunks[i][0] > relevant_chunks[i - 1][0]: | |
| relevant_chunks[i], relevant_chunks[i - 1] = relevant_chunks[i - 1], relevant_chunks[i] | |
| else: | |
| break | |
| if len(relevant_chunks) > mx_len: | |
| del relevant_chunks[-1] | |
| ''' | |
| Loads one file - extracts text from file | |
| TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader | |
| TODO: Play with .pdf and text from img extraction | |
| TODO: Try chunking with llm | |
| add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true | |
| ''' | |
| def load_document(self, filepath: str, add_to_unprocessed: bool = False) -> list[Document]: | |
| loader = None | |
| if filepath.endswith(".pdf"): | |
| loader = PyPDFLoader( | |
| file_path=filepath) # splits each presentation into slides and processes it as separate file | |
| elif filepath.endswith(".docx") or filepath.endswith(".doc"): | |
| # loader = Docx2txtLoader(file_path=filepath) ## try it later, since UnstructuredWordDocumentLoader is extremly slow | |
| loader = UnstructuredWordDocumentLoader(file_path=filepath) | |
| elif filepath.endswith(".txt"): | |
| loader = TextLoader(file_path=filepath) | |
| if loader is None: | |
| raise RuntimeError("Unsupported type of file") | |
| documents: list[ | |
| Document] = [] # We can not assign a single value to the document since .pdf are splitted into several files | |
| try: | |
| documents = loader.load() | |
| except Exception: | |
| raise RuntimeError("File is corrupted") | |
| if add_to_unprocessed: | |
| for doc in documents: | |
| self.unprocessed.append(doc) | |
| return documents | |
| ''' | |
| Similar to load_document, but for multiple files | |
| add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true | |
| ''' | |
| def load_documents(self, documents: list[str], add_to_unprocessed: bool = False) -> list[Document]: | |
| extracted_documents: list[Document] = [] | |
| for doc in documents: | |
| temp_storage: list[Document] = [] | |
| try: | |
| temp_storage = self.load_document(filepath=doc, | |
| add_to_unprocessed=False) # In some cases it should be True, but i can not imagine any :( | |
| except Exception as e: | |
| logging.error("Error at load_documents while loading %s", doc, exc_info=e) | |
| continue | |
| for extrc_doc in temp_storage: | |
| extracted_documents.append(extrc_doc) | |
| if add_to_unprocessed: | |
| self.unprocessed.append(extrc_doc) | |
| return extracted_documents | |
| ''' | |
| Generates chunks with recursive splitter from the list of unprocessed files, add files to the list of processed, and clears unprocessed | |
| TODO: try to split text with other llm (not really needed, but we should at least try it) | |
| ''' | |
| def generate_chunks(self, query: str = "", embedding: bool = False): | |
| most_relevant = [] | |
| if embedding: | |
| query_embedded = self.embedder.encode(query) | |
| for document in self.unprocessed: | |
| self.processed.append(document) | |
| text: list[str] = self.text_splitter.split_documents([document]) | |
| lines: list[str] = document.page_content.split("\n") | |
| for chunk in text: | |
| start_l, end_l = self.get_start_end_lines( | |
| splitted_text=lines, | |
| start_char=chunk.metadata.get("start_index", 0), | |
| end_char=chunk.metadata.get("start_index", 0) + len(chunk.page_content) | |
| ) | |
| newChunk = Chunk( | |
| id=uuid4(), | |
| filename=document.metadata.get("source", ""), | |
| page_number=document.metadata.get("page", 0), | |
| start_index=chunk.metadata.get("start_index", 0), | |
| start_line=start_l, | |
| end_line=end_l, | |
| text=chunk.page_content | |
| ) | |
| if embedding: | |
| chunk_embedded = self.embedder.encode(newChunk.text) | |
| similarity = self.cosine_similarity(query_embedded, chunk_embedded) | |
| self.update_most_relevant_chunk([similarity, newChunk], most_relevant) | |
| self.chunks.append(newChunk) | |
| self.chunks_unsaved.append(newChunk) | |
| self.unprocessed = [] | |
| print(len(self.chunks_unsaved)) | |
| return most_relevant | |
| ''' | |
| Determines the line, were the chunk starts and ends (1-based indexing) | |
| Some magic stuff here. To be honest, i understood it after 7th attempt | |
| TODO: invent more efficient way | |
| splitted_text -> original text splitted by \n | |
| start_char -> index of symbol, were current chunk starts | |
| end_char -> index of symbol, were current chunk ends | |
| debug_mode -> flag, which enables printing useful info about the process | |
| ''' | |
| def get_start_end_lines(self, splitted_text: list[str], start_char: int, end_char: int, debug_mode: bool = False) -> \ | |
| tuple[int, int]: | |
| if debug_mode: | |
| logging.info(splitted_text) | |
| start, end, char_ct = 0, 0, 0 | |
| iter_count = 1 | |
| for i, line in enumerate(splitted_text): | |
| if debug_mode: | |
| logging.info( | |
| f"start={start_char}, current={char_ct}, end_current={char_ct + len(line) + 1}, end={end_char}, len={len(line)}, iter={iter_count}\n") | |
| if char_ct <= start_char <= char_ct + len(line) + 1: | |
| start = i + 1 | |
| if char_ct <= end_char <= char_ct + len(line) + 1: | |
| end = i + 1 | |
| break | |
| iter_count += 1 | |
| char_ct += len(line) + 1 | |
| if debug_mode: | |
| logging.info(f"result => {start} {end}\n\n\n") | |
| return start, end | |
| ''' | |
| Note: it should be used only once to download tokenizers, futher usage is not recommended | |
| ''' | |
| def update_nltk(self) -> None: | |
| nltk.download('punkt') | |
| nltk.download('averaged_perceptron_tagger') | |
| ''' | |
| For now the system works as follows: we save recently loaded chunks in two arrays: | |
| chunks - for all chunks, even for that ones that havn't been saveed to db | |
| chunks_unsaved - for chunks that have been added recently | |
| I do not know weather we really need to store all chunks that were added in the | |
| current session, but chunks_unsaved are used to avoid dublications while saving to db. | |
| ''' | |
| def clear_unsaved_chunks(self): | |
| self.chunks_unsaved = [] | |
| def get_all_chunks(self) -> list[Chunk]: | |
| return self.chunks | |
| ''' | |
| If we want to save chunks to db, we need to clear the temp storage to avoid dublications | |
| ''' | |
| def get_and_save_unsaved_chunks(self) -> list[Chunk]: | |
| chunks_copy: list[Chunk] = self.chunks.copy() | |
| self.clear_unsaved_chunks() | |
| return chunks_copy | |