| | import os |
| | import re |
| | import requests |
| | import pysrt |
| | from langchain_community.document_loaders import ( |
| | Docx2txtLoader, |
| | YoutubeLoader, |
| | TextLoader, |
| | ) |
| | from langchain.schema import Document |
| | import logging |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain_experimental.text_splitter import SemanticChunker |
| | from langchain_openai.embeddings import OpenAIEmbeddings |
| | import json |
| | from concurrent.futures import ThreadPoolExecutor |
| | from urllib.parse import urljoin |
| | import html2text |
| | import bs4 |
| | import PyPDF2 |
| | from modules.dataloader.pdf_readers.base import PDFReader |
| | from modules.dataloader.pdf_readers.llama import LlamaParser |
| | from modules.dataloader.pdf_readers.gpt import GPTParser |
| | from modules.dataloader.helpers import get_metadata |
| | from modules.config.constants import TIMEOUT |
| |
|
| | logger = logging.getLogger(__name__) |
| | BASE_DIR = os.getcwd() |
| |
|
| |
|
| | class HTMLReader: |
| | def __init__(self): |
| | pass |
| |
|
| | def read_url(self, url): |
| | response = requests.get(url, timeout=TIMEOUT) |
| | if response.status_code == 200: |
| | return response.text |
| | else: |
| | logger.warning(f"Failed to download HTML from URL: {url}") |
| | return None |
| |
|
| | def check_links(self, base_url, html_content): |
| | soup = bs4.BeautifulSoup(html_content, "html.parser") |
| | for link in soup.find_all("a"): |
| | href = link.get("href") |
| |
|
| | if not href or href.startswith("#"): |
| | continue |
| | elif not href.startswith("https"): |
| | href = href.replace("http", "https") |
| |
|
| | absolute_url = urljoin(base_url, href) |
| | link["href"] = absolute_url |
| |
|
| | resp = requests.head(absolute_url, timeout=TIMEOUT) |
| | if resp.status_code != 200: |
| | logger.warning( |
| | f"Link {absolute_url} is broken. Status code: {resp.status_code}" |
| | ) |
| |
|
| | return str(soup) |
| |
|
| | def html_to_md(self, url, html_content): |
| | html_processed = self.check_links(url, html_content) |
| | markdown_content = html2text.html2text(html_processed) |
| | return markdown_content |
| |
|
| | def read_html(self, url): |
| | html_content = self.read_url(url) |
| | if html_content: |
| | return self.html_to_md(url, html_content) |
| | else: |
| | return None |
| |
|
| |
|
| | class FileReader: |
| | def __init__(self, logger, kind): |
| | self.logger = logger |
| | self.kind = kind |
| | if kind == "llama": |
| | self.pdf_reader = LlamaParser() |
| | elif kind == "gpt": |
| | self.pdf_reader = GPTParser() |
| | else: |
| | self.pdf_reader = PDFReader() |
| | self.web_reader = HTMLReader() |
| | self.logger.info( |
| | f"Initialized FileReader with {kind} PDF reader and HTML reader" |
| | ) |
| |
|
| | def extract_text_from_pdf(self, pdf_path): |
| | text = "" |
| | with open(pdf_path, "rb") as file: |
| | reader = PyPDF2.PdfReader(file) |
| | num_pages = len(reader.pages) |
| | for page_num in range(num_pages): |
| | page = reader.pages[page_num] |
| | text += page.extract_text() |
| | return text |
| |
|
| | def read_pdf(self, temp_file_path: str): |
| | documents = self.pdf_reader.parse(temp_file_path) |
| | return documents |
| |
|
| | def read_txt(self, temp_file_path: str): |
| | loader = TextLoader(temp_file_path, autodetect_encoding=True) |
| | return loader.load() |
| |
|
| | def read_docx(self, temp_file_path: str): |
| | loader = Docx2txtLoader(temp_file_path) |
| | return loader.load() |
| |
|
| | def read_srt(self, temp_file_path: str): |
| | subs = pysrt.open(temp_file_path) |
| | text = "" |
| | for sub in subs: |
| | text += sub.text |
| | return [Document(page_content=text)] |
| |
|
| | def read_youtube_transcript(self, url: str): |
| | loader = YoutubeLoader.from_youtube_url( |
| | url, add_video_info=True, language=["en"], translation="en" |
| | ) |
| | return loader.load() |
| |
|
| | def read_html(self, url: str): |
| | return [Document(page_content=self.web_reader.read_html(url))] |
| |
|
| | def read_tex_from_url(self, tex_url): |
| | response = requests.get(tex_url, timeout=TIMEOUT) |
| | if response.status_code == 200: |
| | return [Document(page_content=response.text)] |
| | else: |
| | self.logger.error(f"Failed to fetch .tex file from URL: {tex_url}") |
| | return None |
| |
|
| |
|
| | class ChunkProcessor: |
| | def __init__(self, config, logger): |
| | self.config = config |
| | self.logger = logger |
| |
|
| | self.document_data = {} |
| | self.document_metadata = {} |
| | self.document_chunks_full = [] |
| |
|
| | |
| | if not config["vectorstore"]["reparse_files"]: |
| | self.load_document_data() |
| |
|
| | if config["splitter_options"]["use_splitter"]: |
| | if config["splitter_options"]["chunking_mode"] == "fixed": |
| | if config["splitter_options"]["split_by_token"]: |
| | self.splitter = ( |
| | RecursiveCharacterTextSplitter.from_tiktoken_encoder( |
| | chunk_size=config["splitter_options"]["chunk_size"], |
| | chunk_overlap=config["splitter_options"]["chunk_overlap"], |
| | separators=config["splitter_options"]["chunk_separators"], |
| | disallowed_special=(), |
| | ) |
| | ) |
| | else: |
| | self.splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=config["splitter_options"]["chunk_size"], |
| | chunk_overlap=config["splitter_options"]["chunk_overlap"], |
| | separators=config["splitter_options"]["chunk_separators"], |
| | disallowed_special=(), |
| | ) |
| | else: |
| | self.splitter = SemanticChunker( |
| | OpenAIEmbeddings(), breakpoint_threshold_type="percentile" |
| | ) |
| |
|
| | else: |
| | self.splitter = None |
| | self.logger.info("ChunkProcessor instance created") |
| |
|
| | def remove_delimiters(self, document_chunks: list): |
| | for chunk in document_chunks: |
| | for delimiter in self.config["splitter_options"]["delimiters_to_remove"]: |
| | chunk.page_content = re.sub(delimiter, " ", chunk.page_content) |
| | return document_chunks |
| |
|
| | def remove_chunks(self, document_chunks: list): |
| | front = self.config["splitter_options"]["front_chunk_to_remove"] |
| | end = self.config["splitter_options"]["last_chunks_to_remove"] |
| | for _ in range(front): |
| | del document_chunks[0] |
| | for _ in range(end): |
| | document_chunks.pop() |
| | return document_chunks |
| |
|
| | def process_chunks( |
| | self, documents, file_type="txt", source="", page=0, metadata={} |
| | ): |
| | |
| | documents = [Document(page_content=documents, source=source, page=page)] |
| | if ( |
| | file_type == "pdf" |
| | and self.config["splitter_options"]["chunking_mode"] == "fixed" |
| | ): |
| | document_chunks = documents |
| | else: |
| | document_chunks = self.splitter.split_documents(documents) |
| |
|
| | |
| | for chunk in document_chunks: |
| | chunk.metadata["source"] = source |
| | chunk.metadata["page"] = page |
| |
|
| | |
| | for key, value in metadata.items(): |
| | chunk.metadata[key] = value |
| |
|
| | if self.config["splitter_options"]["remove_leftover_delimiters"]: |
| | document_chunks = self.remove_delimiters(document_chunks) |
| | if self.config["splitter_options"]["remove_chunks"]: |
| | document_chunks = self.remove_chunks(document_chunks) |
| |
|
| | return document_chunks |
| |
|
| | def chunk_docs(self, file_reader, uploaded_files, weblinks): |
| | addl_metadata = get_metadata( |
| | *self.config["metadata"]["metadata_links"], self.config |
| | ) |
| |
|
| | |
| | if not self.config["vectorstore"]["reparse_files"]: |
| | total_documents = len(uploaded_files) + len(weblinks) |
| | uploaded_files = [ |
| | file_path |
| | for file_path in uploaded_files |
| | if file_path not in self.document_data |
| | ] |
| | weblinks = [link for link in weblinks if link not in self.document_data] |
| | print( |
| | f"Total documents to process: {total_documents}, Documents already processed: {total_documents - len(uploaded_files) - len(weblinks)}" |
| | ) |
| |
|
| | with ThreadPoolExecutor() as executor: |
| | executor.map( |
| | self.process_file, |
| | uploaded_files, |
| | range(len(uploaded_files)), |
| | [file_reader] * len(uploaded_files), |
| | [addl_metadata] * len(uploaded_files), |
| | ) |
| | executor.map( |
| | self.process_weblink, |
| | weblinks, |
| | range(len(weblinks)), |
| | [file_reader] * len(weblinks), |
| | [addl_metadata] * len(weblinks), |
| | ) |
| |
|
| | document_names = [ |
| | f"{file_name}_{page_num}" |
| | for file_name, pages in self.document_data.items() |
| | for page_num in pages.keys() |
| | ] |
| | documents = [ |
| | page for doc in self.document_data.values() for page in doc.values() |
| | ] |
| | document_metadata = [ |
| | page for doc in self.document_metadata.values() for page in doc.values() |
| | ] |
| |
|
| | self.save_document_data() |
| |
|
| | self.logger.info( |
| | f"Total document chunks extracted: {len(self.document_chunks_full)}" |
| | ) |
| |
|
| | return self.document_chunks_full, document_names, documents, document_metadata |
| |
|
| | def process_documents( |
| | self, documents, file_path, file_type, metadata_source, addl_metadata |
| | ): |
| | file_data = {} |
| | file_metadata = {} |
| |
|
| | for doc in documents: |
| | |
| | |
| |
|
| | page_num = doc.metadata.get("page", 0) |
| | file_data[page_num] = doc.page_content |
| |
|
| | |
| | metadata = addl_metadata.get(file_path, {}).copy() |
| | metadata["page"] = page_num |
| | metadata["source"] = file_path |
| | file_metadata[page_num] = metadata |
| |
|
| | if self.config["vectorstore"]["db_option"] not in ["RAGatouille"]: |
| | document_chunks = self.process_chunks( |
| | doc.page_content, |
| | file_type, |
| | source=file_path, |
| | page=page_num, |
| | metadata=metadata, |
| | ) |
| | self.document_chunks_full.extend(document_chunks) |
| |
|
| | self.document_data[file_path] = file_data |
| | self.document_metadata[file_path] = file_metadata |
| |
|
| | def process_file(self, file_path, file_index, file_reader, addl_metadata): |
| | print(f"Processing file {file_index + 1} : {file_path}") |
| | file_name = os.path.basename(file_path) |
| |
|
| | file_type = file_name.split(".")[-1] |
| |
|
| | read_methods = { |
| | "pdf": file_reader.read_pdf, |
| | "txt": file_reader.read_txt, |
| | "docx": file_reader.read_docx, |
| | "srt": file_reader.read_srt, |
| | "tex": file_reader.read_tex_from_url, |
| | } |
| | if file_type not in read_methods: |
| | self.logger.warning(f"Unsupported file type: {file_type}") |
| | return |
| |
|
| | try: |
| | if file_path in self.document_data: |
| | self.logger.warning(f"File {file_name} already processed") |
| | documents = [ |
| | Document(page_content=content) |
| | for content in self.document_data[file_path].values() |
| | ] |
| | else: |
| | documents = read_methods[file_type](file_path) |
| |
|
| | self.process_documents( |
| | documents, file_path, file_type, "file", addl_metadata |
| | ) |
| | except Exception as e: |
| | self.logger.error(f"Error processing file {file_name}: {str(e)}") |
| |
|
| | def process_weblink(self, link, link_index, file_reader, addl_metadata): |
| | if link in self.document_data: |
| | return |
| |
|
| | self.logger.info(f"Reading link {link_index + 1} : {link}") |
| |
|
| | try: |
| | if "youtube" in link: |
| | documents = file_reader.read_youtube_transcript(link) |
| | else: |
| | documents = file_reader.read_html(link) |
| |
|
| | self.process_documents(documents, link, "txt", "link", addl_metadata) |
| | except Exception as e: |
| | self.logger.error(f"Error Reading link {link_index + 1} : {link}: {str(e)}") |
| |
|
| | def save_document_data(self): |
| | if not os.path.exists(f"{self.config['log_chunk_dir']}/docs"): |
| | os.makedirs(f"{self.config['log_chunk_dir']}/docs") |
| | self.logger.info( |
| | f"Creating directory {self.config['log_chunk_dir']}/docs for document data" |
| | ) |
| | self.logger.info( |
| | f"Saving document content to {self.config['log_chunk_dir']}/docs/doc_content.json" |
| | ) |
| | if not os.path.exists(f"{self.config['log_chunk_dir']}/metadata"): |
| | os.makedirs(f"{self.config['log_chunk_dir']}/metadata") |
| | self.logger.info( |
| | f"Creating directory {self.config['log_chunk_dir']}/metadata for document metadata" |
| | ) |
| | self.logger.info( |
| | f"Saving document metadata to {self.config['log_chunk_dir']}/metadata/doc_metadata.json" |
| | ) |
| | with open( |
| | f"{self.config['log_chunk_dir']}/docs/doc_content.json", "w" |
| | ) as json_file: |
| | json.dump(self.document_data, json_file, indent=4) |
| | with open( |
| | f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "w" |
| | ) as json_file: |
| | json.dump(self.document_metadata, json_file, indent=4) |
| |
|
| | def load_document_data(self): |
| | try: |
| | with open( |
| | f"{self.config['log_chunk_dir']}/docs/doc_content.json", "r" |
| | ) as json_file: |
| | self.document_data = json.load(json_file) |
| | with open( |
| | f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "r" |
| | ) as json_file: |
| | self.document_metadata = json.load(json_file) |
| | self.logger.info( |
| | f"Loaded document content from {self.config['log_chunk_dir']}/docs/doc_content.json. Total documents: {len(self.document_data)}" |
| | ) |
| | except FileNotFoundError: |
| | self.logger.warning( |
| | f"Document content not found in {self.config['log_chunk_dir']}/docs/doc_content.json" |
| | ) |
| | self.document_data = {} |
| | self.document_metadata = {} |
| |
|
| |
|
| | class DataLoader: |
| | def __init__(self, config, logger=None): |
| | self.file_reader = FileReader( |
| | logger=logger, kind=config["llm_params"]["pdf_reader"] |
| | ) |
| | self.chunk_processor = ChunkProcessor(config, logger=logger) |
| |
|
| | def get_chunks(self, uploaded_files, weblinks): |
| | return self.chunk_processor.chunk_docs( |
| | self.file_reader, uploaded_files, weblinks |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import yaml |
| | import argparse |
| |
|
| | parser = argparse.ArgumentParser(description="Process some links.") |
| | parser.add_argument( |
| | "--links", nargs="+", required=True, help="List of links to process." |
| | ) |
| | parser.add_argument( |
| | "--config_file", type=str, help="Path to the main config file", required=True |
| | ) |
| | parser.add_argument( |
| | "--project_config_file", |
| | type=str, |
| | help="Path to the project config file", |
| | required=True, |
| | ) |
| |
|
| | args = parser.parse_args() |
| | links_to_process = args.links |
| |
|
| | logger = logging.getLogger(__name__) |
| | logger.setLevel(logging.INFO) |
| |
|
| | with open(args.config_file, "r") as f: |
| | config = yaml.safe_load(f) |
| |
|
| | with open(args.project_config_file, "r") as f: |
| | project_config = yaml.safe_load(f) |
| |
|
| | |
| | config.update(project_config) |
| |
|
| | STORAGE_DIR = os.path.join(BASE_DIR, config["vectorstore"]["data_path"]) |
| | uploaded_files = [ |
| | os.path.join(STORAGE_DIR, file) |
| | for file in os.listdir(STORAGE_DIR) |
| | if file != "urls.txt" |
| | ] |
| |
|
| | data_loader = DataLoader(config, logger=logger) |
| | |
| | ( |
| | document_chunks, |
| | document_names, |
| | documents, |
| | document_metadata, |
| | ) = data_loader.get_chunks( |
| | links_to_process, |
| | [], |
| | ) |
| |
|
| | print(document_names[:5]) |
| | print(len(document_chunks)) |
| |
|