Spaces:
Running
Running
| import os | |
| import logging | |
| from typing import List, Literal | |
| # LangChain imports for the Markdown logic | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter | |
| # Custom Core Imports | |
| from core.ParagraphChunker import ParagraphChunker | |
| from core.TokenChunker import TokenChunker | |
| # Configure Logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def _process_markdown(file_path: str, chunk_size: int = 1000, chunk_overlap: int = 100) -> List[Document]: | |
| """ | |
| Internal helper to process Markdown files using Header Semantic Splitting. | |
| """ | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| markdown_text = f.read() | |
| # Define headers to split on (Logic: Keep context attached to the section) | |
| headers_to_split_on = [ | |
| ("#", "Header 1"), | |
| ("##", "Header 2"), | |
| ("###", "Header 3"), | |
| ] | |
| # Stage 1: Split by Structure (Headers) | |
| markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on) | |
| md_header_splits = markdown_splitter.split_text(markdown_text) | |
| # Stage 2: Split by Size (Recursively split long sections) | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap | |
| ) | |
| final_docs = text_splitter.split_documents(md_header_splits) | |
| # Add source metadata | |
| for doc in final_docs: | |
| doc.metadata['source'] = file_path | |
| doc.metadata['file_type'] = 'md' | |
| logger.info(f"Markdown processing complete: {len(final_docs)} chunks created.") | |
| return final_docs | |
| except Exception as e: | |
| logger.error(f"Error processing Markdown file {file_path}: {e}") | |
| return [] | |
| def process_file( | |
| file_path: str, | |
| chunking_strategy: Literal["paragraph", "token"] = "paragraph", | |
| chunk_size: int = 512, | |
| chunk_overlap: int = 50, | |
| model_name: str = "gpt-4o" # Used for token counting in your custom classes | |
| ) -> List[Document]: | |
| """ | |
| Main entry point for processing a single file. | |
| Routes to the correct custom chunker or markdown handler based on extension. | |
| """ | |
| if not os.path.exists(file_path): | |
| logger.error(f"File not found: {file_path}") | |
| return [] | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| logger.info(f"Processing {file_path} using strategy: {chunking_strategy}") | |
| # --------------------------------------------------------- | |
| # 1. Handle Markdown (Specialized Logic) | |
| # --------------------------------------------------------- | |
| if file_extension == ".md": | |
| return _process_markdown(file_path, chunk_size, chunk_overlap) | |
| # --------------------------------------------------------- | |
| # 2. Handle PDF and TXT (Custom Core Logic) | |
| # --------------------------------------------------------- | |
| elif file_extension in [".pdf", ".txt"]: | |
| # Initialize the appropriate Custom Chunker | |
| if chunking_strategy == "token": | |
| chunker = TokenChunker( | |
| model_name=model_name, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap | |
| ) | |
| else: | |
| # Paragraph chunker relies on semantic boundaries, not strict sizes | |
| chunker = ParagraphChunker(model_name=model_name) | |
| # Process based on file type | |
| try: | |
| if file_extension == ".pdf": | |
| # Uses OCREnhancedPDFLoader internally via BaseChunker | |
| return chunker.process_document(file_path) | |
| elif file_extension == ".txt": | |
| # Uses direct text reading with paragraph preservation | |
| return chunker.process_text_file(file_path) | |
| except Exception as e: | |
| logger.error(f"Error using {chunking_strategy} chunker on {file_path}: {e}") | |
| return [] | |
| else: | |
| logger.warning(f"Unsupported file extension: {file_extension}") | |
| return [] | |
| def load_documents_from_directory( | |
| directory_path: str, | |
| chunking_strategy: Literal["paragraph", "token"] = "paragraph" | |
| ) -> List[Document]: | |
| """ | |
| Batch helper to process a directory of files. | |
| """ | |
| all_docs = [] | |
| for root, _, files in os.walk(directory_path): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| # Only process supported extensions | |
| if file.lower().endswith(('.pdf', '.txt', '.md')): | |
| docs = process_file(file_path, chunking_strategy=chunking_strategy) | |
| all_docs.extend(docs) | |
| return all_docs | |
| def list_documents(username: str = "default") -> List[str]: | |
| """ | |
| Lists all supported documents for a specific user. | |
| Adjust 'source_documents' if your folder is named differently. | |
| """ | |
| # Define your source directory (Update this path if you use a different one!) | |
| base_dir = "source_documents" | |
| user_dir = os.path.join(base_dir, username) | |
| if not os.path.exists(user_dir): | |
| return [] | |
| files = [] | |
| for f in os.listdir(user_dir): | |
| if f.lower().endswith(('.pdf', '.txt', '.md')): | |
| files.append(f) | |
| return files |