Spaces:
Running
Running
| import os | |
| import logging | |
| import json | |
| import argparse | |
| import csv | |
| from typing import List, Dict, Optional | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from utils import extract_text_from_file, FAISS_RAG_SUPPORTED_EXTENSIONS | |
| # --- Logging Setup --- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def process_sources_and_create_chunks( | |
| sources_dir: str, | |
| output_file: str, | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 150, | |
| text_output_dir: Optional[str] = None | |
| ) -> None: | |
| if not os.path.isdir(sources_dir): | |
| logger.error(f"Source directory not found: '{sources_dir}'") | |
| raise FileNotFoundError(f"Source directory not found: '{sources_dir}'") | |
| logger.info(f"Starting chunking process. Sources: '{sources_dir}', Output: '{output_file}'") | |
| if text_output_dir: | |
| os.makedirs(text_output_dir, exist_ok=True) | |
| logger.info(f"Will save raw extracted text to: '{text_output_dir}'") | |
| all_chunks_for_json: List[Dict] = [] | |
| processed_files_count = 0 | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| for filename in os.listdir(sources_dir): | |
| file_path = os.path.join(sources_dir, filename) | |
| if not os.path.isfile(file_path): | |
| continue | |
| file_ext = filename.split('.')[-1].lower() | |
| if file_ext not in FAISS_RAG_SUPPORTED_EXTENSIONS: | |
| logger.debug(f"Skipping unsupported file: {filename}") | |
| continue | |
| logger.info(f"Processing source file: {filename}") | |
| # CSV Handling natively row by row | |
| if file_ext == 'csv': | |
| try: | |
| with open(file_path, mode='r', encoding='utf-8-sig') as f: | |
| reader = csv.DictReader(f) | |
| for i, row in enumerate(reader): | |
| row_text = "\n".join([f"{k}: {v}" for k, v in row.items() if k and v and str(v).strip()]) | |
| chunk_data = { | |
| "page_content": row_text, | |
| "metadata": { | |
| "source_document_name": filename, | |
| "chunk_index": i, | |
| "full_location": f"{filename}, Row {i+1}", | |
| "source_type": "csv" | |
| } | |
| } | |
| all_chunks_for_json.append(chunk_data) | |
| processed_files_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error processing CSV {filename}: {e}") | |
| else: | |
| text_content = FAISS_RAG_SUPPORTED_EXTENSIONS[file_ext](file_path) | |
| if text_content and text_content != "CSV_HANDLED_NATIVELY": | |
| if text_output_dir: | |
| try: | |
| text_output_path = os.path.join(text_output_dir, f"{filename}.txt") | |
| with open(text_output_path, 'w', encoding='utf-8') as f_text: | |
| f_text.write(text_content) | |
| except Exception as e_text_save: | |
| logger.error(f"Could not save extracted text for '{filename}': {e_text_save}") | |
| chunks = text_splitter.split_text(text_content) | |
| for i, chunk_text in enumerate(chunks): | |
| chunk_data = { | |
| "page_content": chunk_text, | |
| "metadata": { | |
| "source_document_name": filename, | |
| "chunk_index": i, | |
| "full_location": f"{filename}, Chunk {i+1}" | |
| } | |
| } | |
| all_chunks_for_json.append(chunk_data) | |
| processed_files_count += 1 | |
| if not all_chunks_for_json: | |
| logger.warning(f"No processable documents found in '{sources_dir}'.") | |
| output_dir = os.path.dirname(output_file) | |
| os.makedirs(output_dir, exist_ok=True) | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| json.dump(all_chunks_for_json, f, indent=2) | |
| logger.info(f"Chunking complete. Processed {processed_files_count} files. Total chunks: {len(all_chunks_for_json)}") | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--sources-dir', type=str, required=True) | |
| parser.add_argument('--output-file', type=str, required=True) | |
| parser.add_argument('--text-output-dir', type=str, default=None) | |
| parser.add_argument('--chunk-size', type=int, default=1000) | |
| parser.add_argument('--chunk-overlap', type=int, default=150) | |
| args = parser.parse_args() | |
| try: | |
| process_sources_and_create_chunks( | |
| sources_dir=args.sources_dir, | |
| output_file=args.output_file, | |
| chunk_size=args.chunk_size, | |
| chunk_overlap=args.chunk_overlap, | |
| text_output_dir=args.text_output_dir | |
| ) | |
| except Exception as e: | |
| logger.critical(f"Chunking failed: {e}", exc_info=True) | |
| exit(1) | |
| if __name__ == "__main__": | |
| main() |