import os import logging import json import argparse from typing import List, Dict, Optional from pypdf import PdfReader import docx as python_docx from langchain.text_splitter import RecursiveCharacterTextSplitter # --- Logging Setup --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # --- Text Extraction Helper Functions --- # Note: These are duplicated from llm_handling.py to make this a standalone script. def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]: logger.info(f"Extracting text from {file_type.upper()} file: {os.path.basename(file_path)}") text_content = None try: if file_type == 'pdf': reader = PdfReader(file_path) text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text()) elif file_type == 'docx': doc = python_docx.Document(file_path) text_content = "\n".join(para.text for para in doc.paragraphs if para.text) elif file_type == 'txt': with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text_content = f.read() else: logger.warning(f"Unsupported file type for text extraction: {file_type} for file {os.path.basename(file_path)}") return None if not text_content or not text_content.strip(): logger.warning(f"No text content extracted from {os.path.basename(file_path)}") return None return text_content.strip() except Exception as e: logger.error(f"Error extracting text from {os.path.basename(file_path)} ({file_type.upper()}): {e}", exc_info=True) return None SUPPORTED_EXTENSIONS = { 'pdf': lambda path: extract_text_from_file(path, 'pdf'), 'docx': lambda path: extract_text_from_file(path, 'docx'), 'txt': lambda path: extract_text_from_file(path, 'txt'), } def process_sources_and_create_chunks( sources_dir: str, output_file: str, chunk_size: int = 1000, chunk_overlap: int = 150, text_output_dir: Optional[str] = None # MODIFIED: Added optional parameter ) -> None: """ Scans a directory for source files, extracts text, splits it into chunks, and saves the chunks to a single JSON file. Optionally saves the raw extracted text to a specified directory. """ if not os.path.isdir(sources_dir): logger.error(f"Source directory not found: '{sources_dir}'") raise FileNotFoundError(f"Source directory not found: '{sources_dir}'") logger.info(f"Starting chunking process. Sources: '{sources_dir}', Output: '{output_file}'") # MODIFIED: Create text output directory if provided if text_output_dir: os.makedirs(text_output_dir, exist_ok=True) logger.info(f"Will save raw extracted text to: '{text_output_dir}'") all_chunks_for_json: List[Dict] = [] processed_files_count = 0 text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) for filename in os.listdir(sources_dir): file_path = os.path.join(sources_dir, filename) if not os.path.isfile(file_path): continue file_ext = filename.split('.')[-1].lower() if file_ext not in SUPPORTED_EXTENSIONS: logger.debug(f"Skipping unsupported file: {filename}") continue logger.info(f"Processing source file: {filename}") text_content = SUPPORTED_EXTENSIONS[file_ext](file_path) if text_content: # MODIFIED: Save the raw text to a file if directory is specified if text_output_dir: try: text_output_path = os.path.join(text_output_dir, f"{filename}.txt") with open(text_output_path, 'w', encoding='utf-8') as f_text: f_text.write(text_content) logger.info(f"Saved extracted text for '{filename}' to '{text_output_path}'") except Exception as e_text_save: logger.error(f"Could not save extracted text for '{filename}': {e_text_save}") chunks = text_splitter.split_text(text_content) if not chunks: logger.warning(f"No chunks generated from {filename}. Skipping.") continue for i, chunk_text in enumerate(chunks): chunk_data = { "page_content": chunk_text, "metadata": { "source_document_name": filename, "chunk_index": i, "full_location": f"{filename}, Chunk {i+1}" } } all_chunks_for_json.append(chunk_data) processed_files_count += 1 else: logger.warning(f"Could not extract text from {filename}. Skipping.") if not all_chunks_for_json: logger.warning(f"No processable documents found or no text extracted in '{sources_dir}'. JSON file will be empty.") output_dir = os.path.dirname(output_file) os.makedirs(output_dir, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: json.dump(all_chunks_for_json, f, indent=2) logger.info(f"Chunking complete. Processed {processed_files_count} files.") logger.info(f"Created a total of {len(all_chunks_for_json)} chunks.") logger.info(f"Chunked JSON output saved to: {output_file}") def main(): parser = argparse.ArgumentParser(description="Process source documents into a JSON file of text chunks for RAG.") parser.add_argument( '--sources-dir', type=str, required=True, help="The directory containing source files (PDFs, DOCX, TXT)." ) parser.add_argument( '--output-file', type=str, required=True, help="The full path for the output JSON file containing the chunks." ) # MODIFIED: Added new optional argument parser.add_argument( '--text-output-dir', type=str, default=None, help="Optional: The directory to save raw extracted text files for debugging." ) parser.add_argument( '--chunk-size', type=int, default=1000, help="The character size for each text chunk." ) parser.add_argument( '--chunk-overlap', type=int, default=150, help="The character overlap between consecutive chunks." ) args = parser.parse_args() try: process_sources_and_create_chunks( sources_dir=args.sources_dir, output_file=args.output_file, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap, text_output_dir=args.text_output_dir # MODIFIED: Pass argument ) except Exception as e: logger.critical(f"A critical error occurred during the chunking process: {e}", exc_info=True) exit(1) if __name__ == "__main__": main()