Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| import json | |
| from typing import List | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import ( | |
| RecursiveCharacterTextSplitter, | |
| MarkdownHeaderTextSplitter, | |
| RecursiveJsonSplitter, | |
| ) | |
| from chonkie import CodeChunker | |
| from config import CHUNK_OVERLAP,CHUNK_SIZE,AST_BASED_SPLITTING | |
| def custom_splitter(docs: List[Document],current_dir: Path) -> List[Document]: | |
| all_chunks: List[Document] = [] | |
| md_splitter = MarkdownHeaderTextSplitter( | |
| headers_to_split_on=[("#", "H1"), ("##", "H2"), ("###", "H3")] | |
| ) | |
| text_fallback_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP, | |
| ) | |
| json_splitter = RecursiveJsonSplitter( | |
| max_chunk_size=CHUNK_SIZE, | |
| ) | |
| csv_splitter = RecursiveCharacterTextSplitter( | |
| separators=["\n"], | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=0, | |
| ) | |
| for doc in docs: | |
| # --- FIX: Empty Files Check --- | |
| # Skip completely empty documents to save compute time | |
| if not doc.page_content or not doc.page_content.strip(): | |
| continue | |
| source_str = doc.metadata.get("source", "") | |
| if not source_str: | |
| continue | |
| path = Path(source_str) | |
| ext = path.suffix.lower() | |
| try: | |
| repo_path = str(path.relative_to(current_dir)) | |
| except ValueError: | |
| repo_path = str(path) | |
| base_metadata = { | |
| **doc.metadata, | |
| "file_name": path.name, | |
| "extension": ext, | |
| "path_rel_repo": repo_path, | |
| } | |
| doc_chunks: List[Document] = [] | |
| # AST-based code chunking | |
| if ext in AST_BASED_SPLITTING: | |
| ast_chunker = CodeChunker( | |
| language=AST_BASED_SPLITTING.get(ext), | |
| tokenizer="character", | |
| chunk_size=CHUNK_SIZE, | |
| include_nodes=False, | |
| ) | |
| try: | |
| chonkie_chunks = ast_chunker.chunk(doc.page_content) | |
| for chunk in chonkie_chunks: | |
| doc_chunks.append( | |
| Document( | |
| page_content=chunk.text, | |
| metadata=base_metadata.copy(), | |
| ) | |
| ) | |
| except Exception as e: | |
| print( | |
| f"Warning: AST parsing failed for {path.name}. " | |
| f"Falling back to text. Error: {e}" | |
| ) | |
| doc_chunks = text_fallback_splitter.split_documents([doc]) | |
| # Markdown | |
| elif ext in {".md", ".mdx"}: | |
| md_splits = md_splitter.split_text(doc.page_content) | |
| for split in md_splits: | |
| split.metadata = {**base_metadata, **split.metadata} | |
| doc_chunks = text_fallback_splitter.split_documents(md_splits) | |
| # JSON | |
| elif ext == ".json": | |
| try: | |
| parsed_data = json.loads(doc.page_content) | |
| #------ Normalize the data: because remeber json can be in two formate one single dictionary or list of dictionary | |
| texts_to_split = [] | |
| if isinstance(parsed_data, list): | |
| # If it's a list, treat each item as a separate document | |
| # This yields much better search results for RAG | |
| for item in parsed_data: | |
| if isinstance(item, dict): | |
| texts_to_split.append(item) | |
| else: | |
| texts_to_split.append({"value": item}) | |
| elif isinstance(parsed_data, dict): | |
| # If it's already a dict, it's safe | |
| texts_to_split.append(parsed_data) | |
| else: | |
| # If it's a raw string/number/bool | |
| texts_to_split.append({"value": parsed_data}) | |
| # --------------------------------------------- | |
| # Create metadatas array to match the length of texts_to_split | |
| metadatas = [base_metadata.copy() for _ in texts_to_split] | |
| json_docs = json_splitter.create_documents( | |
| texts=texts_to_split, | |
| metadatas=metadatas, | |
| ) | |
| doc_chunks.extend(json_docs) | |
| except json.JSONDecodeError as e: | |
| print( | |
| f"Warning: Invalid JSON syntax in {path.name}. " | |
| f"Falling back to text. Error: {e}" | |
| ) | |
| doc_chunks = text_fallback_splitter.split_documents([doc]) | |
| # JSONL | |
| elif ext == ".jsonl": | |
| for line in doc.page_content.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| line_data = json.loads(line) | |
| # --- Normalize JSONL lines --- | |
| if not isinstance(line_data, dict): | |
| line_data = {"value": line_data} | |
| json_docs = json_splitter.create_documents( | |
| texts=[line_data], | |
| metadatas=[base_metadata.copy()], | |
| ) | |
| doc_chunks.extend(json_docs) | |
| except json.JSONDecodeError as e: | |
| print( | |
| f"Warning: Invalid JSONL line in {path.name}. " | |
| f"Skipping. Error: {e}" | |
| ) | |
| # CSV / TSV | |
| elif ext in {".csv", ".tsv"}: | |
| lines = doc.page_content.splitlines() | |
| if not lines: | |
| continue | |
| header = lines[0] | |
| doc_chunks = csv_splitter.split_documents([doc]) | |
| for i, chunk in enumerate(doc_chunks): | |
| if i == 0: | |
| continue | |
| # --- FIX: CSV Header Logic --- | |
| # Ensure the chunk doesn't already have the header and strip leading newlines | |
| # to prevent broken/malformed line boundaries. | |
| if not chunk.page_content.startswith(header): | |
| chunk.page_content = header + "\n" + chunk.page_content.lstrip() | |
| chunk.metadata = base_metadata.copy() | |
| # Fallback | |
| else: | |
| doc_chunks = text_fallback_splitter.split_documents([doc]) | |
| # ββ FILE NAME INJECTION βββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Inject the file name into the text payload to give LLM Context. | |
| for chunk in doc_chunks: | |
| # 1. Update metadata | |
| chunk.metadata = {**base_metadata, **chunk.metadata} | |
| chunk.page_content = f"[FILE: {path.name}]\n\n" + chunk.page_content | |
| all_chunks.append(chunk) | |
| print(f"Original Files Processed : {len(docs)}") | |
| print(f"Total Chunks Generated : {len(all_chunks)}") | |
| return all_chunks | |