EndoBot / chunker.py
SergioI1991's picture
Upload 43 files
606fa93 verified
import os
import logging
import json
import argparse
from typing import List, Dict, Optional
from pypdf import PdfReader
import docx as python_docx
from langchain.text_splitter import RecursiveCharacterTextSplitter
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# --- Text Extraction Helper Functions ---
# Note: These are duplicated from llm_handling.py to make this a standalone script.
def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]:
logger.info(f"Extracting text from {file_type.upper()} file: {os.path.basename(file_path)}")
text_content = None
try:
if file_type == 'pdf':
reader = PdfReader(file_path)
text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text())
elif file_type == 'docx':
doc = python_docx.Document(file_path)
text_content = "\n".join(para.text for para in doc.paragraphs if para.text)
elif file_type == 'txt':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text_content = f.read()
else:
logger.warning(f"Unsupported file type for text extraction: {file_type} for file {os.path.basename(file_path)}")
return None
if not text_content or not text_content.strip():
logger.warning(f"No text content extracted from {os.path.basename(file_path)}")
return None
return text_content.strip()
except Exception as e:
logger.error(f"Error extracting text from {os.path.basename(file_path)} ({file_type.upper()}): {e}", exc_info=True)
return None
SUPPORTED_EXTENSIONS = {
'pdf': lambda path: extract_text_from_file(path, 'pdf'),
'docx': lambda path: extract_text_from_file(path, 'docx'),
'txt': lambda path: extract_text_from_file(path, 'txt'),
}
def process_sources_and_create_chunks(
sources_dir: str,
output_file: str,
chunk_size: int = 1000,
chunk_overlap: int = 150,
text_output_dir: Optional[str] = None # MODIFIED: Added optional parameter
) -> None:
"""
Scans a directory for source files, extracts text, splits it into chunks,
and saves the chunks to a single JSON file.
Optionally saves the raw extracted text to a specified directory.
"""
if not os.path.isdir(sources_dir):
logger.error(f"Source directory not found: '{sources_dir}'")
raise FileNotFoundError(f"Source directory not found: '{sources_dir}'")
logger.info(f"Starting chunking process. Sources: '{sources_dir}', Output: '{output_file}'")
# MODIFIED: Create text output directory if provided
if text_output_dir:
os.makedirs(text_output_dir, exist_ok=True)
logger.info(f"Will save raw extracted text to: '{text_output_dir}'")
all_chunks_for_json: List[Dict] = []
processed_files_count = 0
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
for filename in os.listdir(sources_dir):
file_path = os.path.join(sources_dir, filename)
if not os.path.isfile(file_path):
continue
file_ext = filename.split('.')[-1].lower()
if file_ext not in SUPPORTED_EXTENSIONS:
logger.debug(f"Skipping unsupported file: {filename}")
continue
logger.info(f"Processing source file: {filename}")
text_content = SUPPORTED_EXTENSIONS[file_ext](file_path)
if text_content:
# MODIFIED: Save the raw text to a file if directory is specified
if text_output_dir:
try:
text_output_path = os.path.join(text_output_dir, f"{filename}.txt")
with open(text_output_path, 'w', encoding='utf-8') as f_text:
f_text.write(text_content)
logger.info(f"Saved extracted text for '{filename}' to '{text_output_path}'")
except Exception as e_text_save:
logger.error(f"Could not save extracted text for '{filename}': {e_text_save}")
chunks = text_splitter.split_text(text_content)
if not chunks:
logger.warning(f"No chunks generated from {filename}. Skipping.")
continue
for i, chunk_text in enumerate(chunks):
chunk_data = {
"page_content": chunk_text,
"metadata": {
"source_document_name": filename,
"chunk_index": i,
"full_location": f"{filename}, Chunk {i+1}"
}
}
all_chunks_for_json.append(chunk_data)
processed_files_count += 1
else:
logger.warning(f"Could not extract text from {filename}. Skipping.")
if not all_chunks_for_json:
logger.warning(f"No processable documents found or no text extracted in '{sources_dir}'. JSON file will be empty.")
output_dir = os.path.dirname(output_file)
os.makedirs(output_dir, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(all_chunks_for_json, f, indent=2)
logger.info(f"Chunking complete. Processed {processed_files_count} files.")
logger.info(f"Created a total of {len(all_chunks_for_json)} chunks.")
logger.info(f"Chunked JSON output saved to: {output_file}")
def main():
parser = argparse.ArgumentParser(description="Process source documents into a JSON file of text chunks for RAG.")
parser.add_argument(
'--sources-dir',
type=str,
required=True,
help="The directory containing source files (PDFs, DOCX, TXT)."
)
parser.add_argument(
'--output-file',
type=str,
required=True,
help="The full path for the output JSON file containing the chunks."
)
# MODIFIED: Added new optional argument
parser.add_argument(
'--text-output-dir',
type=str,
default=None,
help="Optional: The directory to save raw extracted text files for debugging."
)
parser.add_argument(
'--chunk-size',
type=int,
default=1000,
help="The character size for each text chunk."
)
parser.add_argument(
'--chunk-overlap',
type=int,
default=150,
help="The character overlap between consecutive chunks."
)
args = parser.parse_args()
try:
process_sources_and_create_chunks(
sources_dir=args.sources_dir,
output_file=args.output_file,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
text_output_dir=args.text_output_dir # MODIFIED: Pass argument
)
except Exception as e:
logger.critical(f"A critical error occurred during the chunking process: {e}", exc_info=True)
exit(1)
if __name__ == "__main__":
main()