WhatRepo / back_end /core /splitter.py
Krishna172912's picture
Create splitter.py
c6868fa unverified
from pathlib import Path
import json
from typing import List
from langchain_core.documents import Document
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter,
RecursiveJsonSplitter,
)
from chonkie import CodeChunker
from config import CHUNK_OVERLAP,CHUNK_SIZE,AST_BASED_SPLITTING
def custom_splitter(docs: List[Document],current_dir: Path) -> List[Document]:
all_chunks: List[Document] = []
md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[("#", "H1"), ("##", "H2"), ("###", "H3")]
)
text_fallback_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
)
json_splitter = RecursiveJsonSplitter(
max_chunk_size=CHUNK_SIZE,
)
csv_splitter = RecursiveCharacterTextSplitter(
separators=["\n"],
chunk_size=CHUNK_SIZE,
chunk_overlap=0,
)
for doc in docs:
# --- FIX: Empty Files Check ---
# Skip completely empty documents to save compute time
if not doc.page_content or not doc.page_content.strip():
continue
source_str = doc.metadata.get("source", "")
if not source_str:
continue
path = Path(source_str)
ext = path.suffix.lower()
try:
repo_path = str(path.relative_to(current_dir))
except ValueError:
repo_path = str(path)
base_metadata = {
**doc.metadata,
"file_name": path.name,
"extension": ext,
"path_rel_repo": repo_path,
}
doc_chunks: List[Document] = []
# AST-based code chunking
if ext in AST_BASED_SPLITTING:
ast_chunker = CodeChunker(
language=AST_BASED_SPLITTING.get(ext),
tokenizer="character",
chunk_size=CHUNK_SIZE,
include_nodes=False,
)
try:
chonkie_chunks = ast_chunker.chunk(doc.page_content)
for chunk in chonkie_chunks:
doc_chunks.append(
Document(
page_content=chunk.text,
metadata=base_metadata.copy(),
)
)
except Exception as e:
print(
f"Warning: AST parsing failed for {path.name}. "
f"Falling back to text. Error: {e}"
)
doc_chunks = text_fallback_splitter.split_documents([doc])
# Markdown
elif ext in {".md", ".mdx"}:
md_splits = md_splitter.split_text(doc.page_content)
for split in md_splits:
split.metadata = {**base_metadata, **split.metadata}
doc_chunks = text_fallback_splitter.split_documents(md_splits)
# JSON
elif ext == ".json":
try:
parsed_data = json.loads(doc.page_content)
#------ Normalize the data: because remeber json can be in two formate one single dictionary or list of dictionary
texts_to_split = []
if isinstance(parsed_data, list):
# If it's a list, treat each item as a separate document
# This yields much better search results for RAG
for item in parsed_data:
if isinstance(item, dict):
texts_to_split.append(item)
else:
texts_to_split.append({"value": item})
elif isinstance(parsed_data, dict):
# If it's already a dict, it's safe
texts_to_split.append(parsed_data)
else:
# If it's a raw string/number/bool
texts_to_split.append({"value": parsed_data})
# ---------------------------------------------
# Create metadatas array to match the length of texts_to_split
metadatas = [base_metadata.copy() for _ in texts_to_split]
json_docs = json_splitter.create_documents(
texts=texts_to_split,
metadatas=metadatas,
)
doc_chunks.extend(json_docs)
except json.JSONDecodeError as e:
print(
f"Warning: Invalid JSON syntax in {path.name}. "
f"Falling back to text. Error: {e}"
)
doc_chunks = text_fallback_splitter.split_documents([doc])
# JSONL
elif ext == ".jsonl":
for line in doc.page_content.splitlines():
line = line.strip()
if not line:
continue
try:
line_data = json.loads(line)
# --- Normalize JSONL lines ---
if not isinstance(line_data, dict):
line_data = {"value": line_data}
json_docs = json_splitter.create_documents(
texts=[line_data],
metadatas=[base_metadata.copy()],
)
doc_chunks.extend(json_docs)
except json.JSONDecodeError as e:
print(
f"Warning: Invalid JSONL line in {path.name}. "
f"Skipping. Error: {e}"
)
# CSV / TSV
elif ext in {".csv", ".tsv"}:
lines = doc.page_content.splitlines()
if not lines:
continue
header = lines[0]
doc_chunks = csv_splitter.split_documents([doc])
for i, chunk in enumerate(doc_chunks):
if i == 0:
continue
# --- FIX: CSV Header Logic ---
# Ensure the chunk doesn't already have the header and strip leading newlines
# to prevent broken/malformed line boundaries.
if not chunk.page_content.startswith(header):
chunk.page_content = header + "\n" + chunk.page_content.lstrip()
chunk.metadata = base_metadata.copy()
# Fallback
else:
doc_chunks = text_fallback_splitter.split_documents([doc])
# ── FILE NAME INJECTION ───────────────────────────────────────────────
# Inject the file name into the text payload to give LLM Context.
for chunk in doc_chunks:
# 1. Update metadata
chunk.metadata = {**base_metadata, **chunk.metadata}
chunk.page_content = f"[FILE: {path.name}]\n\n" + chunk.page_content
all_chunks.append(chunk)
print(f"Original Files Processed : {len(docs)}")
print(f"Total Chunks Generated : {len(all_chunks)}")
return all_chunks