File size: 4,033 Bytes
11ba2bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""Markdown-aware, code-block-preserving chunker for FreeCAD wiki pages."""
import re
import uuid
from typing import Any

import tiktoken
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter

from src.config import CHUNK_SIZE, CHUNK_OVERLAP

_enc = tiktoken.get_encoding("cl100k_base")

# Matches fenced code blocks (``` or ~~~, with optional language tag)
_FENCE_RE = re.compile(r"(```[\w]*\n.*?```|~~~[\w]*\n.*?~~~)", re.DOTALL)

_HEADERS_TO_SPLIT = [("#", "h1"), ("##", "h2"), ("###", "h3")]

_SPLITTER = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", ". ", " ", ""],
    chunk_size=CHUNK_SIZE * 4,  # chars; ~4 chars per token
    chunk_overlap=CHUNK_OVERLAP * 4,
    length_function=len,
)


def _count_tokens(text: str) -> int:
    return len(_enc.encode(text))


def _protect_code_blocks(text: str) -> tuple[str, dict[str, str]]:
    """Replace fenced code blocks with stable placeholders. Returns modified text + map."""
    placeholders: dict[str, str] = {}
    def replace(m: re.Match) -> str:
        key = f"__CODEBLOCK_{uuid.uuid4().hex}__"
        placeholders[key] = m.group(0)
        return key
    return _FENCE_RE.sub(replace, text), placeholders


def _restore_code_blocks(text: str, placeholders: dict[str, str]) -> str:
    for key, code in placeholders.items():
        text = text.replace(key, code)
    return text


def _classify(text: str) -> str:
    has_code = bool(_FENCE_RE.search(text)) or bool(re.search(r"^\s{4}", text, re.MULTILINE))
    has_prose = bool(re.search(r"[a-zA-Z]{20,}", text))
    if has_code and has_prose:
        return "mixed"
    if has_code:
        return "code"
    return "text"


def chunk_page(page: dict[str, Any]) -> list[dict[str, Any]]:
    """
    Split one wiki page dict into a list of chunk dicts ready for embedding.
    page keys: source_file, page_title, source_url, raw_text, priority
    """
    raw = page["raw_text"]
    protected, placeholders = _protect_code_blocks(raw)

    # Structural split on headers
    header_splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=_HEADERS_TO_SPLIT, strip_headers=False
    )
    sections = header_splitter.split_text(protected)

    chunks: list[dict[str, Any]] = []
    for sec in sections:
        content = sec.page_content
        meta = sec.metadata  # {"h1": ..., "h2": ..., "h3": ...}

        # Restore code blocks before deciding whether to split further
        restored = _restore_code_blocks(content, placeholders)
        tok_len = _count_tokens(restored)

        if tok_len <= CHUNK_SIZE:
            candidates = [restored]
        else:
            # Re-protect code blocks for the recursive splitter
            protected2, ph2 = _protect_code_blocks(restored)
            raw_splits = _SPLITTER.split_text(protected2)
            candidates = [_restore_code_blocks(s, ph2) for s in raw_splits]

        section_label = meta.get("h3") or meta.get("h2") or meta.get("h1") or ""

        for text in candidates:
            text = text.strip()
            if not text or _count_tokens(text) < 30:
                continue

            # Build preamble for BM25/embedding quality
            preamble = f"[Page: {page['page_title']} | Section: {section_label}]\n" if section_label else f"[Page: {page['page_title']}]\n"
            full_text = preamble + text

            chunks.append({
                "source_file": page["source_file"],
                "source_url": page["source_url"],
                "page_title": page["page_title"],
                "section": section_label,
                "type": _classify(text),
                "text": full_text,
                "token_len": _count_tokens(full_text),
                "char_len": len(full_text),
            })

    return chunks


def chunk_pages(pages: list[dict]) -> list[dict]:
    all_chunks: list[dict] = []
    for page in pages:
        all_chunks.extend(chunk_page(page))
    for i, c in enumerate(all_chunks):
        c["chunk_id"] = i
    return all_chunks