Spaces:
Runtime error
Runtime error
| import os | |
| from typing import List, Union | |
| from pdfminer.high_level import extract_text | |
| import io | |
| from chainlit.types import AskFileResponse | |
| import re | |
| class TextFileLoader: | |
| def __init__(self, path: str, encoding: str = "utf-8"): | |
| self.documents = [] | |
| self.path = path | |
| self.encoding = encoding | |
| def load(self): | |
| if os.path.isdir(self.path): | |
| self.load_directory() | |
| elif os.path.isfile(self.path) and self.path.endswith(".txt"): | |
| self.load_file() | |
| else: | |
| raise ValueError( | |
| "Provided path is neither a valid directory nor a .txt file." | |
| ) | |
| def load_file(self): | |
| with open(self.path, "r", encoding=self.encoding) as f: | |
| self.documents.append(f.read()) | |
| def load_directory(self): | |
| for root, _, files in os.walk(self.path): | |
| for file in files: | |
| if file.endswith(".txt"): | |
| with open( | |
| os.path.join(root, file), "r", encoding=self.encoding | |
| ) as f: | |
| self.documents.append(f.read()) | |
| def load_documents(self): | |
| self.load() | |
| return self.documents | |
| class PDFFileLoader(TextFileLoader): | |
| def __init__(self, path: str, encoding: str = "utf-8", content=None, files: list[AskFileResponse] = None): | |
| super().__init__(path, encoding) | |
| self.content = content | |
| self.files = files | |
| def load(self): | |
| if isinstance(self.files, List): | |
| for file in self.files: | |
| if file.content and file.path.endswith(".pdf"): | |
| self.content = file.content | |
| self.load_content() | |
| elif os.path.isdir(self.path): | |
| self.load_directory() | |
| elif os.path.isfile(self.path) and self.path.endswith(".pdf"): | |
| print("loading file ...") | |
| self.load_file() | |
| elif self.content and self.path.endswith(".pdf"): | |
| print("loading content ...") | |
| self.load_content() | |
| else: | |
| raise ValueError( | |
| "Provided path is neither a valid directory nor a .pdf file." | |
| ) | |
| def load_content(self): | |
| """Load pdf already in memory""" | |
| text = extract_text(io.BytesIO(self.content)) | |
| text = self.clean_text(text) | |
| self.documents.append(text) | |
| def clean_text(self, text): | |
| """Clean text by removing special characters.""" | |
| # remove all \n | |
| text = text.replace('\n', ' ') | |
| text = re.sub(' +', ' ', text) | |
| # remove page number, we find it because it appears before '\x0c', use regex to find it | |
| text = re.sub(r'\d+ \x0c', '\x0c', text) | |
| # remove all '\x0c' | |
| text = text.replace('\x0c', ' ') | |
| return text | |
| def load_file(self): | |
| text = extract_text(pdf_file=self.path, codec=self.encoding) | |
| self.documents.append(text) | |
| def load_directory(self): | |
| for root, _, files in os.walk(self.path): | |
| for file in files: | |
| if file.endswith(".pdf"): | |
| self.documents.append( | |
| extract_text(os.path.join(root, file), encoding=self.encoding) | |
| ) | |
| class CharacterTextSplitter: | |
| def __init__( | |
| self, | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 200, | |
| ): | |
| assert ( | |
| chunk_size > chunk_overlap | |
| ), "Chunk size must be greater than chunk overlap" | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| def split(self, text: str) -> List[str]: | |
| chunks = [] | |
| for i in range(0, len(text), self.chunk_size - self.chunk_overlap): | |
| chunks.append(text[i : i + self.chunk_size]) | |
| return chunks | |
| def split_texts(self, texts: List[str]) -> List[str]: | |
| chunks = [] | |
| for text in texts: | |
| chunks.extend(self.split(text)) | |
| return chunks | |
| if __name__ == "__main__": | |
| loader = TextFileLoader("data/KingLear.txt") | |
| loader.load() | |
| splitter = CharacterTextSplitter() | |
| chunks = splitter.split_texts(loader.documents) | |
| print(len(chunks)) | |
| print(chunks[0]) | |
| print("--------") | |
| print(chunks[1]) | |
| print("--------") | |
| print(chunks[-2]) | |
| print("--------") | |
| print(chunks[-1]) | |