File size: 5,359 Bytes
c88e290
6695d4a
 
 
 
 
bc373db
c88e290
6695d4a
 
 
 
 
 
 
 
 
e0f2368
6695d4a
e0f2368
c88e290
6695d4a
 
 
 
 
 
 
 
e0f2368
 
6695d4a
 
 
c88e290
6695d4a
 
 
 
 
 
e0f2368
6695d4a
 
 
 
c88e290
6695d4a
e0f2368
7841205
 
6695d4a
7841205
c88e290
6695d4a
 
 
 
 
 
 
 
 
 
 
 
 
 
c88e290
 
6695d4a
 
 
 
 
 
 
 
 
 
 
 
 
e0f2368
6695d4a
 
 
 
 
 
 
e0f2368
6695d4a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c88e290
6695d4a
 
 
e5ea137
6695d4a
 
 
 
e5ea137
6695d4a
e5ea137
6695d4a
 
 
 
 
 
 
 
e5ea137
6695d4a
 
99043ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os
import logging
from typing import List, Literal

# LangChain imports for the Markdown logic
from langchain_core.documents import Document
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter

# Custom Core Imports
from core.ParagraphChunker import ParagraphChunker
from core.TokenChunker import TokenChunker

# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def _process_markdown(file_path: str, chunk_size: int = 1000, chunk_overlap: int = 100) -> List[Document]:
    """
    Internal helper to process Markdown files using Header Semantic Splitting.
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            markdown_text = f.read()

        # Define headers to split on (Logic: Keep context attached to the section)
        headers_to_split_on = [
            ("#", "Header 1"),
            ("##", "Header 2"),
            ("###", "Header 3"),
        ]

        # Stage 1: Split by Structure (Headers)
        markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
        md_header_splits = markdown_splitter.split_text(markdown_text)

        # Stage 2: Split by Size (Recursively split long sections)
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size, 
            chunk_overlap=chunk_overlap
        )
        final_docs = text_splitter.split_documents(md_header_splits)
        
        # Add source metadata
        for doc in final_docs:
            doc.metadata['source'] = file_path
            doc.metadata['file_type'] = 'md'

        logger.info(f"Markdown processing complete: {len(final_docs)} chunks created.")
        return final_docs

    except Exception as e:
        logger.error(f"Error processing Markdown file {file_path}: {e}")
        return []

def process_file(
    file_path: str, 
    chunking_strategy: Literal["paragraph", "token"] = "paragraph",
    chunk_size: int = 512,
    chunk_overlap: int = 50,
    model_name: str = "gpt-4o" # Used for token counting in your custom classes
) -> List[Document]:
    """
    Main entry point for processing a single file.
    Routes to the correct custom chunker or markdown handler based on extension.
    """
    
    if not os.path.exists(file_path):
        logger.error(f"File not found: {file_path}")
        return []

    file_extension = os.path.splitext(file_path)[1].lower()
    logger.info(f"Processing {file_path} using strategy: {chunking_strategy}")

    # ---------------------------------------------------------
    # 1. Handle Markdown (Specialized Logic)
    # ---------------------------------------------------------
    if file_extension == ".md":
        return _process_markdown(file_path, chunk_size, chunk_overlap)

    # ---------------------------------------------------------
    # 2. Handle PDF and TXT (Custom Core Logic)
    # ---------------------------------------------------------
    elif file_extension in [".pdf", ".txt"]:
        
        # Initialize the appropriate Custom Chunker
        if chunking_strategy == "token":
            chunker = TokenChunker(
                model_name=model_name,
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap
            )
        else:
            # Paragraph chunker relies on semantic boundaries, not strict sizes
            chunker = ParagraphChunker(model_name=model_name)

        # Process based on file type
        try:
            if file_extension == ".pdf":
                # Uses OCREnhancedPDFLoader internally via BaseChunker
                return chunker.process_document(file_path)
            
            elif file_extension == ".txt":
                # Uses direct text reading with paragraph preservation
                return chunker.process_text_file(file_path)
                
        except Exception as e:
            logger.error(f"Error using {chunking_strategy} chunker on {file_path}: {e}")
            return []

    else:
        logger.warning(f"Unsupported file extension: {file_extension}")
        return []

def load_documents_from_directory(
    directory_path: str, 
    chunking_strategy: Literal["paragraph", "token"] = "paragraph"
) -> List[Document]:
    """
    Batch helper to process a directory of files.
    """
    all_docs = []
    for root, _, files in os.walk(directory_path):
        for file in files:
            file_path = os.path.join(root, file)
            # Only process supported extensions
            if file.lower().endswith(('.pdf', '.txt', '.md')):
                docs = process_file(file_path, chunking_strategy=chunking_strategy)
                all_docs.extend(docs)
    
    return all_docs

def list_documents(username: str = "default") -> List[str]:
    """
    Lists all supported documents for a specific user.
    Adjust 'source_documents' if your folder is named differently.
    """
    # Define your source directory (Update this path if you use a different one!)
    base_dir = "source_documents" 
    user_dir = os.path.join(base_dir, username)
    
    if not os.path.exists(user_dir):
        return []
        
    files = []
    for f in os.listdir(user_dir):
        if f.lower().endswith(('.pdf', '.txt', '.md')):
            files.append(f)
            
    return files