| from langchain_community.document_loaders import ( |
| UnstructuredWordDocumentLoader, |
| TextLoader, |
| CSVLoader, |
| UnstructuredMarkdownLoader, |
| ) |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_core.documents import Document |
| from app.core.chunks import Chunk |
| import nltk |
| from uuid import ( |
| uuid4, |
| ) |
| import numpy as np |
| from app.settings import logging, settings |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| import os |
| import fitz |
|
|
| class PDFLoader: |
| def __init__(self, file_path: str): |
| self.file_path = file_path |
|
|
| def load(self) -> list[Document]: |
| docs = [] |
| with fitz.open(self.file_path) as doc: |
| for page in doc: |
| text = page.get_text("text") |
| metadata = { |
| "source": self.file_path, |
| "page": page.number, |
| } |
| docs.append(Document(page_content=text, metadata=metadata)) |
| return docs |
|
|
|
|
| class DocumentProcessor: |
| """ |
| TODO: determine the most suitable chunk size |
| |
| chunks -> the list of chunks from loaded files |
| chunks_unsaved -> the list of recently added chunks that have not been saved to db yet |
| processed -> the list of files that were already splitted into chunks |
| unprocessed -> !processed |
| text_splitter -> text splitting strategy |
| """ |
|
|
| def __init__(self): |
| self.chunks_unsaved: list[Chunk] = [] |
| self.unprocessed: list[Document] = [] |
| self.max_workers = min(4, os.cpu_count() or 1) |
| self.text_splitter = RecursiveCharacterTextSplitter( |
| **settings.text_splitter.model_dump() |
| ) |
|
|
| """ |
| Measures cosine between two vectors |
| """ |
|
|
| def cosine_similarity(self, vec1, vec2): |
| return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) |
|
|
| """ |
| Updates a list of the most relevant chunks without interacting with db |
| """ |
|
|
| def update_most_relevant_chunk( |
| self, |
| chunk: list[np.float64, Chunk], |
| relevant_chunks: list[list[np.float64, Chunk]], |
| mx_len=15, |
| ): |
| relevant_chunks.append(chunk) |
| for i in range(len(relevant_chunks) - 1, 0, -1): |
| if relevant_chunks[i][0] > relevant_chunks[i - 1][0]: |
| relevant_chunks[i], relevant_chunks[i - 1] = ( |
| relevant_chunks[i - 1], |
| relevant_chunks[i], |
| ) |
| else: |
| break |
|
|
| if len(relevant_chunks) > mx_len: |
| del relevant_chunks[-1] |
|
|
| """ |
| Loads one file - extracts text from file |
| |
| TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader |
| TODO: Play with .pdf and text from img extraction |
| TODO: Try chunking with llm |
| |
| add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true |
| """ |
|
|
| def check_size(self, file_path: str = "") -> bool: |
| try: |
| size = os.path.getsize(filename=file_path) |
| except Exception: |
| size = 0 |
|
|
| if size > 1000000: |
| return True |
| return False |
|
|
| def document_multiplexer(self, filepath: str, get_loader: bool = False, get_chunking_strategy: bool = False): |
| loader = None |
| parallelization = False |
| if filepath.endswith(".pdf"): |
| loader = PDFLoader( |
| file_path=filepath |
| ) |
| parallelization = False |
| elif filepath.endswith(".docx") or filepath.endswith(".doc"): |
| loader = UnstructuredWordDocumentLoader(file_path=filepath) |
| elif filepath.endswith(".txt"): |
| loader = TextLoader(file_path=filepath) |
| elif filepath.endswith(".csv"): |
| loader = CSVLoader(file_path=filepath) |
| elif filepath.endswith(".json"): |
| loader = TextLoader(file_path=filepath) |
| elif filepath.endswith(".md"): |
| loader = UnstructuredMarkdownLoader(file_path=filepath) |
|
|
| if filepath.endswith(".pdf"): |
| parallelization = False |
| else: |
| parallelization = self.check_size(file_path=filepath) |
|
|
| if get_loader: |
| return loader |
| elif get_chunking_strategy: |
| return parallelization |
| else: |
| raise RuntimeError("What to do, my lord?") |
|
|
| def load_document( |
| self, filepath: str, add_to_unprocessed: bool = False |
| ) -> list[Document]: |
| loader = self.document_multiplexer(filepath=filepath, get_loader=True) |
|
|
| if loader is None: |
| raise RuntimeError("Unsupported type of file") |
|
|
| documents: list[Document] = [] |
| try: |
| documents = loader.load() |
| |
| except Exception: |
| raise RuntimeError("File is corrupted") |
|
|
| if add_to_unprocessed: |
| for doc in documents: |
| self.unprocessed.append(doc) |
|
|
| strategy = self.document_multiplexer(filepath=filepath, get_chunking_strategy=True) |
| print(f"Strategy --> {strategy}") |
| self.generate_chunks(parallelization=strategy) |
| return documents |
|
|
| """ |
| Similar to load_document, but for multiple files |
| |
| add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true |
| """ |
|
|
| def load_documents( |
| self, documents: list[str], add_to_unprocessed: bool = False |
| ) -> list[Document]: |
| extracted_documents: list[Document] = [] |
|
|
| for doc in documents: |
| temp_storage: list[Document] = [] |
|
|
| try: |
| temp_storage = self.load_document( |
| filepath=doc, add_to_unprocessed=True |
| ) |
| except Exception as e: |
| logging.error( |
| "Error at load_documents while loading %s", doc, exc_info=e |
| ) |
| continue |
|
|
| for extrc_doc in temp_storage: |
| extracted_documents.append(extrc_doc) |
|
|
| if add_to_unprocessed: |
| self.unprocessed.append(extrc_doc) |
|
|
| return extracted_documents |
|
|
| def split_into_groups(self, original_list: list[any], split_by: int = 15) -> list[list[any]]: |
| output = [] |
| for i in range(0, len(original_list), split_by): |
| new_group = original_list[i: i + split_by] |
| output.append(new_group) |
| return output |
|
|
| def _chunkinize(self, document: Document, text: list[str], lines: list[dict]) -> list[Chunk]: |
| output: list[Chunk] = [] |
| for chunk in text: |
| start_l, end_l = self.get_start_end_lines( |
| splitted_text=lines, |
| start_char=chunk.metadata.get("start_index", 0), |
| end_char=chunk.metadata.get("start_index", 0) |
| + len(chunk.page_content), |
| ) |
|
|
| new_chunk = Chunk( |
| id=uuid4(), |
| filename=document.metadata.get("source", ""), |
| page_number=document.metadata.get("page", 0), |
| start_index=chunk.metadata.get("start_index", 0), |
| start_line=start_l, |
| end_line=end_l, |
| text=chunk.page_content, |
| ) |
| |
| output.append(new_chunk) |
| return output |
|
|
| def precompute_lines(self, splitted_document: list[str]) -> list[dict]: |
| current_start = 0 |
| output: list[dict] = [] |
| for i, line in enumerate(splitted_document): |
| output.append({"id": i + 1, "start": current_start, "end": current_start + len(line) + 1, "text": line}) |
| current_start += len(line) + 1 |
| return output |
|
|
| def generate_chunks(self, parallelization: bool = True): |
| intermediate = [] |
| for document in self.unprocessed: |
| text: list[str] = self.text_splitter.split_documents(documents=[document]) |
| lines: list[dict] = self.precompute_lines(splitted_document=document.page_content.splitlines()) |
| groups = self.split_into_groups(original_list=text, split_by=50) |
|
|
| if parallelization: |
| print("<------- Apply Parallel Execution ------->") |
| with ProcessPoolExecutor(max_workers=self.max_workers) as executor: |
| futures = [executor.submit(self._chunkinize, document, group, lines) for group in groups] |
| for feature in as_completed(futures): |
| intermediate.append(feature.result()) |
| else: |
| intermediate.append(self._chunkinize(document=document, text=text, lines=lines)) |
|
|
| for group in intermediate: |
| for chunk in group: |
| self.chunks_unsaved.append(chunk) |
|
|
| self.unprocessed = [] |
|
|
| def find_line(self, splitted_text: list[dict], char) -> int: |
| l, r = 0, len(splitted_text) - 1 |
|
|
| while l <= r: |
| m = (l + r) // 2 |
| line = splitted_text[m] |
|
|
| if line["start"] <= char < line["end"]: |
| return m + 1 |
| elif char < line["start"]: |
| r = m - 1 |
| else: |
| l = m + 1 |
|
|
| return r |
|
|
| def get_start_end_lines( |
| self, |
| splitted_text: list[dict], |
| start_char: int, |
| end_char: int, |
| debug_mode: bool = False, |
| ) -> tuple[int, int]: |
| start = self.find_line(splitted_text=splitted_text, char=start_char) |
| end = self.find_line(splitted_text=splitted_text, char=end_char) |
| return (start, end) |
|
|
| """ |
| Note: it should be used only once to download tokenizers, futher usage is not recommended |
| """ |
|
|
| def update_nltk(self) -> None: |
| nltk.download("punkt") |
| nltk.download("averaged_perceptron_tagger") |
|
|
| """ |
| For now the system works as follows: we save recently loaded chunks in two arrays: |
| chunks - for all chunks, even for that ones that havn't been saveed to db |
| chunks_unsaved - for chunks that have been added recently |
| I do not know weather we really need to store all chunks that were added in the |
| current session, but chunks_unsaved are used to avoid dublications while saving to db. |
| """ |
|
|
| def get_and_save_unsaved_chunks(self) -> list[Chunk]: |
| chunks_copy: list[Chunk] = self.chunks_unsaved.copy() |
| self.clear_unsaved_chunks() |
| return chunks_copy |
|
|
| def clear_unsaved_chunks(self): |
| self.chunks_unsaved = [] |
|
|
| def get_all_chunks(self) -> list[Chunk]: |
| return self.chunks_unsaved |
|
|