AI-Agent-RAG-Bot-Test / chunker.py
SakibAhmed's picture
Upload 14 files
ca6e669 verified
import os
import logging
import json
import argparse
import csv
from typing import List, Dict, Optional
from langchain_text_splitters import RecursiveCharacterTextSplitter
from utils import extract_text_from_file, FAISS_RAG_SUPPORTED_EXTENSIONS
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def process_sources_and_create_chunks(
sources_dir: str,
output_file: str,
chunk_size: int = 1000,
chunk_overlap: int = 150,
text_output_dir: Optional[str] = None
) -> None:
if not os.path.isdir(sources_dir):
logger.error(f"Source directory not found: '{sources_dir}'")
raise FileNotFoundError(f"Source directory not found: '{sources_dir}'")
logger.info(f"Starting chunking process. Sources: '{sources_dir}', Output: '{output_file}'")
if text_output_dir:
os.makedirs(text_output_dir, exist_ok=True)
logger.info(f"Will save raw extracted text to: '{text_output_dir}'")
all_chunks_for_json: List[Dict] = []
processed_files_count = 0
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
for filename in os.listdir(sources_dir):
file_path = os.path.join(sources_dir, filename)
if not os.path.isfile(file_path):
continue
file_ext = filename.split('.')[-1].lower()
if file_ext not in FAISS_RAG_SUPPORTED_EXTENSIONS:
logger.debug(f"Skipping unsupported file: {filename}")
continue
logger.info(f"Processing source file: {filename}")
# CSV Handling natively row by row
if file_ext == 'csv':
try:
with open(file_path, mode='r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
row_text = "\n".join([f"{k}: {v}" for k, v in row.items() if k and v and str(v).strip()])
chunk_data = {
"page_content": row_text,
"metadata": {
"source_document_name": filename,
"chunk_index": i,
"full_location": f"{filename}, Row {i+1}",
"source_type": "csv"
}
}
all_chunks_for_json.append(chunk_data)
processed_files_count += 1
except Exception as e:
logger.error(f"Error processing CSV {filename}: {e}")
else:
text_content = FAISS_RAG_SUPPORTED_EXTENSIONS[file_ext](file_path)
if text_content and text_content != "CSV_HANDLED_NATIVELY":
if text_output_dir:
try:
text_output_path = os.path.join(text_output_dir, f"{filename}.txt")
with open(text_output_path, 'w', encoding='utf-8') as f_text:
f_text.write(text_content)
except Exception as e_text_save:
logger.error(f"Could not save extracted text for '{filename}': {e_text_save}")
chunks = text_splitter.split_text(text_content)
for i, chunk_text in enumerate(chunks):
chunk_data = {
"page_content": chunk_text,
"metadata": {
"source_document_name": filename,
"chunk_index": i,
"full_location": f"{filename}, Chunk {i+1}"
}
}
all_chunks_for_json.append(chunk_data)
processed_files_count += 1
if not all_chunks_for_json:
logger.warning(f"No processable documents found in '{sources_dir}'.")
output_dir = os.path.dirname(output_file)
os.makedirs(output_dir, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(all_chunks_for_json, f, indent=2)
logger.info(f"Chunking complete. Processed {processed_files_count} files. Total chunks: {len(all_chunks_for_json)}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--sources-dir', type=str, required=True)
parser.add_argument('--output-file', type=str, required=True)
parser.add_argument('--text-output-dir', type=str, default=None)
parser.add_argument('--chunk-size', type=int, default=1000)
parser.add_argument('--chunk-overlap', type=int, default=150)
args = parser.parse_args()
try:
process_sources_and_create_chunks(
sources_dir=args.sources_dir,
output_file=args.output_file,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
text_output_dir=args.text_output_dir
)
except Exception as e:
logger.critical(f"Chunking failed: {e}", exc_info=True)
exit(1)
if __name__ == "__main__":
main()