Spaces:
Running
Running
File size: 5,359 Bytes
c88e290 6695d4a bc373db c88e290 6695d4a e0f2368 6695d4a e0f2368 c88e290 6695d4a e0f2368 6695d4a c88e290 6695d4a e0f2368 6695d4a c88e290 6695d4a e0f2368 7841205 6695d4a 7841205 c88e290 6695d4a c88e290 6695d4a e0f2368 6695d4a e0f2368 6695d4a c88e290 6695d4a e5ea137 6695d4a e5ea137 6695d4a e5ea137 6695d4a e5ea137 6695d4a 99043ee | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | import os
import logging
from typing import List, Literal
# LangChain imports for the Markdown logic
from langchain_core.documents import Document
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
# Custom Core Imports
from core.ParagraphChunker import ParagraphChunker
from core.TokenChunker import TokenChunker
# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _process_markdown(file_path: str, chunk_size: int = 1000, chunk_overlap: int = 100) -> List[Document]:
"""
Internal helper to process Markdown files using Header Semantic Splitting.
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
markdown_text = f.read()
# Define headers to split on (Logic: Keep context attached to the section)
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
# Stage 1: Split by Structure (Headers)
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
md_header_splits = markdown_splitter.split_text(markdown_text)
# Stage 2: Split by Size (Recursively split long sections)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
final_docs = text_splitter.split_documents(md_header_splits)
# Add source metadata
for doc in final_docs:
doc.metadata['source'] = file_path
doc.metadata['file_type'] = 'md'
logger.info(f"Markdown processing complete: {len(final_docs)} chunks created.")
return final_docs
except Exception as e:
logger.error(f"Error processing Markdown file {file_path}: {e}")
return []
def process_file(
file_path: str,
chunking_strategy: Literal["paragraph", "token"] = "paragraph",
chunk_size: int = 512,
chunk_overlap: int = 50,
model_name: str = "gpt-4o" # Used for token counting in your custom classes
) -> List[Document]:
"""
Main entry point for processing a single file.
Routes to the correct custom chunker or markdown handler based on extension.
"""
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return []
file_extension = os.path.splitext(file_path)[1].lower()
logger.info(f"Processing {file_path} using strategy: {chunking_strategy}")
# ---------------------------------------------------------
# 1. Handle Markdown (Specialized Logic)
# ---------------------------------------------------------
if file_extension == ".md":
return _process_markdown(file_path, chunk_size, chunk_overlap)
# ---------------------------------------------------------
# 2. Handle PDF and TXT (Custom Core Logic)
# ---------------------------------------------------------
elif file_extension in [".pdf", ".txt"]:
# Initialize the appropriate Custom Chunker
if chunking_strategy == "token":
chunker = TokenChunker(
model_name=model_name,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
else:
# Paragraph chunker relies on semantic boundaries, not strict sizes
chunker = ParagraphChunker(model_name=model_name)
# Process based on file type
try:
if file_extension == ".pdf":
# Uses OCREnhancedPDFLoader internally via BaseChunker
return chunker.process_document(file_path)
elif file_extension == ".txt":
# Uses direct text reading with paragraph preservation
return chunker.process_text_file(file_path)
except Exception as e:
logger.error(f"Error using {chunking_strategy} chunker on {file_path}: {e}")
return []
else:
logger.warning(f"Unsupported file extension: {file_extension}")
return []
def load_documents_from_directory(
directory_path: str,
chunking_strategy: Literal["paragraph", "token"] = "paragraph"
) -> List[Document]:
"""
Batch helper to process a directory of files.
"""
all_docs = []
for root, _, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
# Only process supported extensions
if file.lower().endswith(('.pdf', '.txt', '.md')):
docs = process_file(file_path, chunking_strategy=chunking_strategy)
all_docs.extend(docs)
return all_docs
def list_documents(username: str = "default") -> List[str]:
"""
Lists all supported documents for a specific user.
Adjust 'source_documents' if your folder is named differently.
"""
# Define your source directory (Update this path if you use a different one!)
base_dir = "source_documents"
user_dir = os.path.join(base_dir, username)
if not os.path.exists(user_dir):
return []
files = []
for f in os.listdir(user_dir):
if f.lower().endswith(('.pdf', '.txt', '.md')):
files.append(f)
return files |