Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 5,314 Bytes
78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b 5df4a2a 78a356b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | """Document chunking with markdown-aware semantic splitting."""
from typing import List
from haystack import Document
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.converters import MarkdownToDocument
import logging
import re
logger = logging.getLogger(__name__)
class SemanticChunker:
"""Chunks documents using markdown-aware semantic splitting."""
def __init__(
self,
chunk_size: int = 300,
chunk_overlap: int = 50,
min_chunk_size: int = 100,
):
"""
Initialize the chunker.
Args:
chunk_size: Target number of words per chunk (not used for markdown splitting)
chunk_overlap: Number of words to overlap between chunks
min_chunk_size: Minimum number of words per chunk
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
# Fallback splitter for non-markdown documents
self.splitter = DocumentSplitter(
split_by="sentence",
split_length=chunk_size,
split_overlap=chunk_overlap,
split_threshold=min_chunk_size,
)
# Markdown to plain text converter
self.md_converter = MarkdownToDocument()
def _markdown_to_plain(self, markdown_text: str) -> str:
"""Convert markdown to plain text, removing formatting."""
# Simple markdown to plain text conversion
text = markdown_text
# Remove ## headers but keep the text
text = re.sub(r'^##\s+(.+)$', r'\1', text, flags=re.MULTILINE)
# Remove bold/italic
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'\*(.+?)\*', r'\1', text)
# Remove links but keep text
text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text)
# Remove bullet points
text = re.sub(r'^\*\s+', '', text, flags=re.MULTILINE)
return text.strip()
def _split_by_markdown_headers(self, doc: Document) -> List[Document]:
"""Split a document by markdown H2 headers (##), then convert to plain text."""
content = doc.content
# Find all H2 headers and their positions
pattern = r'^## (.+)$'
matches = list(re.finditer(pattern, content, re.MULTILINE))
if not matches:
# No headers found, convert whole document to plain text
plain_text = self._markdown_to_plain(content)
return [Document(content=plain_text, meta=doc.meta)]
chunks = []
doc_title = doc.meta.get("file_name", "Unknown")
# Extract preamble (before first header)
if matches[0].start() > 0:
preamble_md = content[:matches[0].start()].strip()
if preamble_md:
preamble_plain = self._markdown_to_plain(preamble_md)
if len(preamble_plain.split()) >= 10:
chunk_meta = {**doc.meta, "section": "Introduction"}
chunks.append(Document(content=preamble_plain, meta=chunk_meta))
# Extract each section between headers
for i, match in enumerate(matches):
header = match.group(1).strip()
start = match.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(content)
section_md = content[start:end].strip()
if section_md:
# Convert markdown section to plain text
section_plain = self._markdown_to_plain(section_md)
logger.debug(f"Section '{header}': {len(section_plain.split())} words")
chunk_meta = {**doc.meta, "section": header}
chunks.append(Document(content=section_plain, meta=chunk_meta))
logger.info(f"Split '{doc_title}' into {len(chunks)} sections by markdown headers")
return chunks
def chunk_documents(self, documents: List[Document]) -> List[Document]:
"""
Chunk documents into smaller pieces using markdown-aware splitting.
Args:
documents: List of documents to chunk
Returns:
List of chunked documents with metadata
"""
if not documents:
logger.warning("No documents to chunk")
return []
logger.info(f"Chunking {len(documents)} documents with markdown-aware splitting")
# First, split by markdown headers
all_chunks = []
for doc in documents:
header_chunks = self._split_by_markdown_headers(doc)
all_chunks.extend(header_chunks)
# Add chunk metadata
for idx, doc in enumerate(all_chunks):
if doc.meta is None:
doc.meta = {}
doc.meta["chunk_id"] = idx
doc.meta["chunk_size"] = len(doc.content.split())
logger.info(f"Created {len(all_chunks)} chunks from {len(documents)} documents")
# Log statistics
chunk_sizes = [doc.meta.get("chunk_size", 0) for doc in all_chunks]
if chunk_sizes:
avg_size = sum(chunk_sizes) / len(chunk_sizes)
logger.info(
f"Chunk statistics - Avg: {avg_size:.1f} words, "
f"Min: {min(chunk_sizes)}, Max: {max(chunk_sizes)}"
)
return all_chunks
|