File size: 7,185 Bytes
c6868fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from pathlib import Path
import json
from typing import List

from langchain_core.documents import Document
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter,
    RecursiveJsonSplitter,
)
from chonkie import CodeChunker

from config import CHUNK_OVERLAP,CHUNK_SIZE,AST_BASED_SPLITTING

def custom_splitter(docs: List[Document],current_dir: Path) -> List[Document]:
    all_chunks: List[Document] = []

    md_splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=[("#", "H1"), ("##", "H2"), ("###", "H3")]
    )

    text_fallback_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
    )

    json_splitter = RecursiveJsonSplitter(
        max_chunk_size=CHUNK_SIZE,
    )

    csv_splitter = RecursiveCharacterTextSplitter(
        separators=["\n"],
        chunk_size=CHUNK_SIZE,
        chunk_overlap=0,
    )

    for doc in docs:
        # --- FIX: Empty Files Check ---
        # Skip completely empty documents to save compute time
        if not doc.page_content or not doc.page_content.strip():
            continue

        source_str = doc.metadata.get("source", "")
        if not source_str:
            continue

        path = Path(source_str)
        ext = path.suffix.lower()

        try:
            repo_path = str(path.relative_to(current_dir))
        except ValueError:
            repo_path = str(path)

        base_metadata = {
            **doc.metadata,
            "file_name": path.name,
            "extension": ext,
            "path_rel_repo": repo_path,
        }

        doc_chunks: List[Document] = []

        # AST-based code chunking
        if ext in AST_BASED_SPLITTING:
            ast_chunker = CodeChunker(
                language=AST_BASED_SPLITTING.get(ext),
                tokenizer="character",
                chunk_size=CHUNK_SIZE,
                include_nodes=False,
            )
            try:
                chonkie_chunks = ast_chunker.chunk(doc.page_content)
                for chunk in chonkie_chunks:
                    doc_chunks.append(
                        Document(
                            page_content=chunk.text,
                            metadata=base_metadata.copy(),
                        )
                    )
            except Exception as e:
                print(
                    f"Warning: AST parsing failed for {path.name}. "
                    f"Falling back to text. Error: {e}"
                )
                doc_chunks = text_fallback_splitter.split_documents([doc])

        # Markdown
        elif ext in {".md", ".mdx"}:
            md_splits = md_splitter.split_text(doc.page_content)
            for split in md_splits:
                split.metadata = {**base_metadata, **split.metadata}
            doc_chunks = text_fallback_splitter.split_documents(md_splits)

        # JSON
        elif ext == ".json":
            try:
                parsed_data = json.loads(doc.page_content)
                
                #------ Normalize the data: because remeber json can be in two formate one single dictionary or list of dictionary 
                texts_to_split = []
                
                if isinstance(parsed_data, list):
                    # If it's a list, treat each item as a separate document
                    # This yields much better search results for RAG
                    for item in parsed_data:
                        if isinstance(item, dict):
                            texts_to_split.append(item)
                        else:
                            texts_to_split.append({"value": item})
                elif isinstance(parsed_data, dict):
                    # If it's already a dict, it's safe
                    texts_to_split.append(parsed_data)
                else:
                    # If it's a raw string/number/bool
                    texts_to_split.append({"value": parsed_data})
                # ---------------------------------------------

                # Create metadatas array to match the length of texts_to_split
                metadatas = [base_metadata.copy() for _ in texts_to_split]

                json_docs = json_splitter.create_documents(
                    texts=texts_to_split,
                    metadatas=metadatas,
                )
                doc_chunks.extend(json_docs)

            except json.JSONDecodeError as e:
                print(
                    f"Warning: Invalid JSON syntax in {path.name}. "
                    f"Falling back to text. Error: {e}"
                )
                doc_chunks = text_fallback_splitter.split_documents([doc])

        # JSONL
        elif ext == ".jsonl":
            for line in doc.page_content.splitlines():
                line = line.strip()
                if not line:
                    continue

                try:
                    line_data = json.loads(line)
                    
                    # --- Normalize JSONL lines ---
                    if not isinstance(line_data, dict):
                        line_data = {"value": line_data}
                        
                    json_docs = json_splitter.create_documents(
                        texts=[line_data],
                        metadatas=[base_metadata.copy()],
                    )
                    doc_chunks.extend(json_docs)
                except json.JSONDecodeError as e:
                    print(
                        f"Warning: Invalid JSONL line in {path.name}. "
                        f"Skipping. Error: {e}"
                    )

        # CSV / TSV
        elif ext in {".csv", ".tsv"}:
            lines = doc.page_content.splitlines()
            if not lines:
                continue

            header = lines[0]
            doc_chunks = csv_splitter.split_documents([doc])

            for i, chunk in enumerate(doc_chunks):
                if i == 0:
                    continue
                
                # --- FIX: CSV Header Logic ---
                # Ensure the chunk doesn't already have the header and strip leading newlines
                # to prevent broken/malformed line boundaries.
                if not chunk.page_content.startswith(header):
                    chunk.page_content = header + "\n" + chunk.page_content.lstrip()
                
                chunk.metadata = base_metadata.copy()

        # Fallback
        else:
            doc_chunks = text_fallback_splitter.split_documents([doc])
            
        # ── FILE NAME INJECTION ───────────────────────────────────────────────
        # Inject the file name into the text payload to give LLM Context.
        for chunk in doc_chunks:
            # 1. Update metadata
            chunk.metadata = {**base_metadata, **chunk.metadata}
            chunk.page_content = f"[FILE: {path.name}]\n\n" + chunk.page_content
            all_chunks.append(chunk)

    print(f"Original Files Processed : {len(docs)}")
    print(f"Total Chunks Generated   : {len(all_chunks)}")

    return all_chunks