| """Markdown-aware, code-block-preserving chunker for FreeCAD wiki pages.""" |
| import re |
| import uuid |
| from typing import Any |
|
|
| import tiktoken |
| from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter |
|
|
| from src.config import CHUNK_SIZE, CHUNK_OVERLAP |
|
|
| _enc = tiktoken.get_encoding("cl100k_base") |
|
|
| |
| _FENCE_RE = re.compile(r"(```[\w]*\n.*?```|~~~[\w]*\n.*?~~~)", re.DOTALL) |
|
|
| _HEADERS_TO_SPLIT = [("#", "h1"), ("##", "h2"), ("###", "h3")] |
|
|
| _SPLITTER = RecursiveCharacterTextSplitter( |
| separators=["\n\n", "\n", ". ", " ", ""], |
| chunk_size=CHUNK_SIZE * 4, |
| chunk_overlap=CHUNK_OVERLAP * 4, |
| length_function=len, |
| ) |
|
|
|
|
| def _count_tokens(text: str) -> int: |
| return len(_enc.encode(text)) |
|
|
|
|
| def _protect_code_blocks(text: str) -> tuple[str, dict[str, str]]: |
| """Replace fenced code blocks with stable placeholders. Returns modified text + map.""" |
| placeholders: dict[str, str] = {} |
| def replace(m: re.Match) -> str: |
| key = f"__CODEBLOCK_{uuid.uuid4().hex}__" |
| placeholders[key] = m.group(0) |
| return key |
| return _FENCE_RE.sub(replace, text), placeholders |
|
|
|
|
| def _restore_code_blocks(text: str, placeholders: dict[str, str]) -> str: |
| for key, code in placeholders.items(): |
| text = text.replace(key, code) |
| return text |
|
|
|
|
| def _classify(text: str) -> str: |
| has_code = bool(_FENCE_RE.search(text)) or bool(re.search(r"^\s{4}", text, re.MULTILINE)) |
| has_prose = bool(re.search(r"[a-zA-Z]{20,}", text)) |
| if has_code and has_prose: |
| return "mixed" |
| if has_code: |
| return "code" |
| return "text" |
|
|
|
|
| def chunk_page(page: dict[str, Any]) -> list[dict[str, Any]]: |
| """ |
| Split one wiki page dict into a list of chunk dicts ready for embedding. |
| page keys: source_file, page_title, source_url, raw_text, priority |
| """ |
| raw = page["raw_text"] |
| protected, placeholders = _protect_code_blocks(raw) |
|
|
| |
| header_splitter = MarkdownHeaderTextSplitter( |
| headers_to_split_on=_HEADERS_TO_SPLIT, strip_headers=False |
| ) |
| sections = header_splitter.split_text(protected) |
|
|
| chunks: list[dict[str, Any]] = [] |
| for sec in sections: |
| content = sec.page_content |
| meta = sec.metadata |
|
|
| |
| restored = _restore_code_blocks(content, placeholders) |
| tok_len = _count_tokens(restored) |
|
|
| if tok_len <= CHUNK_SIZE: |
| candidates = [restored] |
| else: |
| |
| protected2, ph2 = _protect_code_blocks(restored) |
| raw_splits = _SPLITTER.split_text(protected2) |
| candidates = [_restore_code_blocks(s, ph2) for s in raw_splits] |
|
|
| section_label = meta.get("h3") or meta.get("h2") or meta.get("h1") or "" |
|
|
| for text in candidates: |
| text = text.strip() |
| if not text or _count_tokens(text) < 30: |
| continue |
|
|
| |
| preamble = f"[Page: {page['page_title']} | Section: {section_label}]\n" if section_label else f"[Page: {page['page_title']}]\n" |
| full_text = preamble + text |
|
|
| chunks.append({ |
| "source_file": page["source_file"], |
| "source_url": page["source_url"], |
| "page_title": page["page_title"], |
| "section": section_label, |
| "type": _classify(text), |
| "text": full_text, |
| "token_len": _count_tokens(full_text), |
| "char_len": len(full_text), |
| }) |
|
|
| return chunks |
|
|
|
|
| def chunk_pages(pages: list[dict]) -> list[dict]: |
| all_chunks: list[dict] = [] |
| for page in pages: |
| all_chunks.extend(chunk_page(page)) |
| for i, c in enumerate(all_chunks): |
| c["chunk_id"] = i |
| return all_chunks |
|
|