| """ |
| ============================================================ |
| 文本处理模块: Markdown 清洗 + 智能分割 (Chunking) |
| ============================================================ |
| 适配 PaddleOCR-VL-1.5 输出的 Markdown 格式文本 |
| |
| 功能: |
| 1. Markdown 文本清洗 (保留表格/公式结构) |
| 2. 基于 LangChain 的语义感知分割 |
| 3. 表格/公式专项处理 |
| """ |
|
|
| import re |
| from typing import List, Optional, Callable |
|
|
| from langchain_core.documents import Document |
|
|
| from loguru import logger |
|
|
| import config |
|
|
|
|
| |
| |
| |
| |
| |
|
|
|
|
| class RecursiveCharacterTextSplitter: |
| """ |
| 递归字符文本分割器 |
| |
| 与 langchain_text_splitters.RecursiveCharacterTextSplitter 接口兼容, |
| 按分隔符优先级逐级分割, 保持语义完整性。 |
| """ |
|
|
| def __init__( |
| self, |
| chunk_size: int = 800, |
| chunk_overlap: int = 150, |
| separators: Optional[List[str]] = None, |
| add_start_index: bool = True, |
| length_function: Callable[[str], int] = len, |
| keep_separator: bool = True, |
| strip_whitespace: bool = True, |
| ): |
| self.chunk_size = chunk_size |
| self.chunk_overlap = chunk_overlap |
| self.separators = separators or ["\n\n", "\n", "。", "!", "?", ";", ".", "!", "?", ";", " ", ""] |
| self.add_start_index = add_start_index |
| self.length_function = length_function |
| self.keep_separator = keep_separator |
| self.strip_whitespace = strip_whitespace |
|
|
| def split_documents(self, documents: List[Document]) -> List[Document]: |
| """分割 Document 列表""" |
| chunks = [] |
| for doc in documents: |
| doc_chunks = self.split_text(doc.page_content, doc.metadata) |
| chunks.extend(doc_chunks) |
| return chunks |
|
|
| def split_text(self, text: str, metadata: Optional[dict] = None) -> List[Document]: |
| """分割单个文本, 返回 Document 列表""" |
| metadata = metadata or {} |
| splits = self._split(text, self.separators) |
| chunks = self._merge(splits) |
|
|
| docs = [] |
| for i, chunk in enumerate(chunks): |
| chunk_meta = {**metadata} |
| if self.add_start_index: |
| chunk_meta["start_index"] = text.find(chunk) if chunk in text else 0 |
| docs.append(Document(page_content=chunk, metadata=chunk_meta)) |
| return docs |
|
|
| def create_documents( |
| self, texts: List[str], metadatas: Optional[List[dict]] = None |
| ) -> List[Document]: |
| """从文本列表创建 Document 列表""" |
| metadatas = metadatas or [{}] * len(texts) |
| docs = [] |
| for text, meta in zip(texts, metadatas): |
| docs.extend(self.split_text(text, meta)) |
| return docs |
|
|
| def _split(self, text: str, separators: List[str]) -> List[str]: |
| """递归分割""" |
| |
| sep = separators[-1] |
| for s in separators: |
| if s == "": |
| sep = s |
| break |
| if s in text: |
| sep = s |
| break |
|
|
| |
| if sep == "": |
| |
| splits = list(text) |
| else: |
| if self.keep_separator: |
| |
| parts = text.split(sep) |
| splits = [] |
| for i, part in enumerate(parts): |
| if i > 0: |
| splits.append(sep + part) |
| else: |
| splits.append(part) |
| else: |
| splits = text.split(sep) |
|
|
| |
| if self.strip_whitespace: |
| splits = [s.strip() for s in splits] |
| splits = [s for s in splits if s] |
|
|
| |
| final_splits = [] |
| for split in splits: |
| if self.length_function(split) <= self.chunk_size: |
| final_splits.append(split) |
| else: |
| |
| if len(separators) > 1: |
| next_seps = separators[separators.index(sep) + 1 :] |
| final_splits.extend(self._split(split, next_seps)) |
| else: |
| |
| forced = self._force_split(split) |
| final_splits.extend(forced) |
|
|
| return final_splits |
|
|
| def _force_split(self, text: str) -> List[str]: |
| """强制按字符数切分 (兜底)""" |
| chunks = [] |
| for i in range(0, len(text), self.chunk_size - self.chunk_overlap): |
| chunk = text[i : i + self.chunk_size] |
| if self.strip_whitespace: |
| chunk = chunk.strip() |
| if chunk: |
| chunks.append(chunk) |
| return chunks |
|
|
| def _merge(self, splits: List[str]) -> List[str]: |
| """合并短片段为 chunk_size 大小的块""" |
| if not splits: |
| return [] |
|
|
| chunks = [] |
| current = "" |
| current_len = 0 |
|
|
| for split in splits: |
| split_len = self.length_function(split) |
|
|
| if current_len + split_len <= self.chunk_size: |
| if current: |
| current += "\n\n" + split |
| current_len += 2 + split_len |
| else: |
| current = split |
| current_len = split_len |
| else: |
| if current: |
| chunks.append(current) |
| |
| if self.chunk_overlap > 0 and current: |
| overlap_text = current[-self.chunk_overlap:] |
| current = overlap_text + "\n\n" + split |
| current_len = self.length_function(current) |
| else: |
| current = split |
| current_len = split_len |
|
|
| if current: |
| chunks.append(current) |
|
|
| return chunks |
|
|
|
|
| |
| |
| |
|
|
| class MarkdownTextCleaner: |
| """PaddleOCR-VL-1.5 Markdown 输出清洗""" |
|
|
| @staticmethod |
| def clean(text: str, preserve_structure: bool = True) -> str: |
| """ |
| 清洗 Markdown 文本 |
| - 保留表格 (|...|) 和公式 ($...$ / $$...$$) |
| - 规范化空白和换行 |
| - 移除 OCR 残留噪声 |
| """ |
| if not text: |
| return "" |
|
|
| cleaned = text.strip() |
|
|
| |
| cleaned = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', cleaned) |
|
|
| |
| cleaned = cleaned.replace('\r\n', '\n').replace('\r', '\n') |
|
|
| |
| if preserve_structure: |
| |
| lines = cleaned.split('\n') |
| cleaned_lines = [] |
| in_table = False |
| in_code = False |
|
|
| for line in lines: |
| |
| if line.strip().startswith('|') and '|' in line.strip()[1:]: |
| in_table = True |
| cleaned_lines.append(line.rstrip()) |
| elif in_table and re.match(r'^[\s\|:\-]+$', line): |
| |
| cleaned_lines.append(line.rstrip()) |
| elif in_table and not line.strip().startswith('|'): |
| in_table = False |
| if line.strip(): |
| cleaned_lines.append(line.strip()) |
| elif cleaned_lines and cleaned_lines[-1] != '': |
| cleaned_lines.append('') |
| elif line.strip().startswith('```'): |
| in_code = not in_code |
| cleaned_lines.append(line.rstrip()) |
| elif in_code: |
| cleaned_lines.append(line.rstrip()) |
| else: |
| |
| stripped = re.sub(r' +', ' ', line.strip()) |
| if stripped: |
| cleaned_lines.append(stripped) |
| elif cleaned_lines and cleaned_lines[-1] != '': |
| cleaned_lines.append('') |
|
|
| cleaned = '\n'.join(cleaned_lines) |
| else: |
| cleaned = re.sub(r' +', ' ', cleaned) |
| cleaned = re.sub(r' *\n *', '\n', cleaned) |
|
|
| |
| cleaned = re.sub(r'\n{4,}', '\n\n\n', cleaned) |
|
|
| return cleaned.strip() |
|
|
| @staticmethod |
| def clean_documents(documents: List[Document]) -> List[Document]: |
| """批量清洗 Document 列表""" |
| cleaned_docs = [] |
| for doc in documents: |
| original_len = len(doc.page_content) |
| cleaned_text = MarkdownTextCleaner.clean(doc.page_content) |
| cleaned_len = len(cleaned_text) |
|
|
| if cleaned_text: |
| cleaned_doc = Document( |
| page_content=cleaned_text, |
| metadata={ |
| **doc.metadata, |
| "cleaned": True, |
| "original_length": original_len, |
| "cleaned_length": cleaned_len, |
| }, |
| ) |
| cleaned_docs.append(cleaned_doc) |
| else: |
| logger.debug( |
| f"页面 {doc.metadata.get('page', '?')} 清洗后为空, 已跳过" |
| ) |
|
|
| logger.info( |
| f"文本清洗: {len(documents)} → {len(cleaned_docs)} 个文档 " |
| f"(移除 {len(documents) - len(cleaned_docs)} 个空白页)" |
| ) |
| return cleaned_docs |
|
|
| @staticmethod |
| def extract_tables_as_chunks(documents: List[Document]) -> List[Document]: |
| """ |
| 将 Markdown 表格提取为独立的文本块 |
| PaddleOCR-VL-1.5 已输出标准 Markdown 表格格式 |
| """ |
| table_docs = [] |
| for doc in documents: |
| tables_html = doc.metadata.get("tables_html", []) |
| tables_md = doc.metadata.get("tables_markdown", []) |
|
|
| for i, (html, md) in enumerate( |
| zip(tables_html, tables_md or [""] * len(tables_html)) |
| ): |
| content = md or html |
| if content.strip(): |
| table_doc = Document( |
| page_content=f"[表格数据]\n{content}", |
| metadata={ |
| **doc.metadata, |
| "content_type": "table", |
| "table_index": i, |
| "table_html": html, |
| "table_markdown": md, |
| }, |
| ) |
| table_docs.append(table_doc) |
|
|
| if table_docs: |
| logger.info(f"提取了 {len(table_docs)} 个表格块") |
| return table_docs |
|
|
| @staticmethod |
| def extract_formulas_as_chunks(documents: List[Document]) -> List[Document]: |
| """将 LaTeX 公式提取为独立块""" |
| formula_docs = [] |
| for doc in documents: |
| formulas_latex = doc.metadata.get("formulas_latex", []) |
| for i, latex in enumerate(formulas_latex): |
| if latex.strip(): |
| formula_doc = Document( |
| page_content=f"[公式]\n$${latex}$$", |
| metadata={ |
| **doc.metadata, |
| "content_type": "formula", |
| "formula_index": i, |
| "formula_latex": latex, |
| }, |
| ) |
| formula_docs.append(formula_doc) |
|
|
| if formula_docs: |
| logger.info(f"提取了 {len(formula_docs)} 个公式块") |
| return formula_docs |
|
|
|
|
| |
| |
| |
|
|
| class DocumentSplitter: |
| """ |
| 文档智能分割器 |
| |
| 针对 PaddleOCR-VL-1.5 的 Markdown 输出优化: |
| - 在 Markdown 标题处分段 |
| - 保护表格完整性 |
| - 保护代码块完整性 |
| """ |
|
|
| def __init__( |
| self, |
| chunk_size: int = config.CHUNK_SIZE, |
| chunk_overlap: int = config.CHUNK_OVERLAP, |
| separators: Optional[List[str]] = None, |
| ): |
| self.chunk_size = chunk_size |
| self.chunk_overlap = chunk_overlap |
| self.separators = separators or config.SEPARATORS |
|
|
| self._splitter = RecursiveCharacterTextSplitter( |
| chunk_size=chunk_size, |
| chunk_overlap=chunk_overlap, |
| separators=self.separators, |
| add_start_index=True, |
| length_function=len, |
| keep_separator=True, |
| strip_whitespace=True, |
| ) |
|
|
| def split_documents(self, documents: List[Document]) -> List[Document]: |
| """分割文档列表""" |
| if not documents: |
| return [] |
|
|
| chunks = self._splitter.split_documents(documents) |
| logger.info( |
| f"文本分割: {len(documents)} → {len(chunks)} 个文本块 " |
| f"(块大小={self.chunk_size}, 重叠={self.chunk_overlap})" |
| ) |
| return chunks |
|
|
| def split_text(self, text: str, metadata: Optional[dict] = None) -> List[Document]: |
| """分割单个文本""" |
| return self._splitter.create_documents( |
| [text], metadatas=[metadata or {}] |
| ) |
|
|
|
|
| class MarkdownAwareSplitter: |
| """ |
| Markdown 感知分割器 |
| |
| 在 Markdown 结构边界处分割: |
| - ## 标题 → 新段 |
| - 表格 → 保持完整 |
| - 代码块 → 保持完整 |
| """ |
|
|
| def __init__( |
| self, |
| target_chunk_size: int = config.CHUNK_SIZE, |
| min_chunk_size: int = 100, |
| ): |
| self.target_chunk_size = target_chunk_size |
| self.min_chunk_size = min_chunk_size |
|
|
| def split_documents(self, documents: List[Document]) -> List[Document]: |
| """基于 Markdown 结构分割""" |
| all_chunks = [] |
|
|
| for doc in documents: |
| sections = self._split_by_headers(doc.page_content) |
| chunks = self._merge_sections( |
| sections, doc.metadata, self.target_chunk_size, self.min_chunk_size |
| ) |
| all_chunks.extend(chunks) |
|
|
| logger.info( |
| f"Markdown 感知分割: {len(documents)} → {len(all_chunks)} 个文本块" |
| ) |
| return all_chunks |
|
|
| @staticmethod |
| def _split_by_headers(text: str) -> List[str]: |
| """ |
| 按 Markdown 标题 (# ## ###) 和段落分割 |
| 保护表格和代码块完整性 |
| """ |
| |
| protected = [] |
| protection_map = {} |
|
|
| def protect(match): |
| key = f"__PROTECTED_{len(protected)}__" |
| protected.append(match.group(0)) |
| protection_map[key] = match.group(0) |
| return key |
|
|
| |
| text = re.sub(r'```[\s\S]*?```', protect, text) |
| |
| text = re.sub( |
| r'(?:^\|.+\|\n)+(?:^\|[\s\-:]+\|\n)?(?:^\|.+\|\n?)+', |
| protect, |
| text, |
| flags=re.MULTILINE, |
| ) |
|
|
| |
| raw_sections = re.split(r'\n(?=#{1,3}\s)', text) |
|
|
| |
| sections = [] |
| for section in raw_sections: |
| for key, original in protection_map.items(): |
| section = section.replace(key, original) |
| section = section.strip() |
| if section: |
| sections.append(section) |
|
|
| return sections |
|
|
| @staticmethod |
| def _merge_sections( |
| sections: List[str], |
| base_metadata: dict, |
| target_size: int, |
| min_size: int, |
| ) -> List[Document]: |
| """将段落合并为目标大小的块""" |
| chunks = [] |
| current = "" |
| start_idx = 0 |
|
|
| for i, section in enumerate(sections): |
| if not current: |
| current = section |
| start_idx = i |
| elif len(current) + len(section) + 2 <= target_size: |
| current += "\n\n" + section |
| else: |
| if len(current) >= min_size: |
| meta = { |
| **base_metadata, |
| "chunk_sections": f"{start_idx}-{i - 1}", |
| "chunk_type": "markdown_semantic", |
| } |
| chunks.append(Document(page_content=current, metadata=meta)) |
| current = section |
| start_idx = i |
|
|
| |
| if current and len(current) >= min_size: |
| meta = { |
| **base_metadata, |
| "chunk_sections": f"{start_idx}-{len(sections) - 1}", |
| "chunk_type": "markdown_semantic", |
| } |
| chunks.append(Document(page_content=current, metadata=meta)) |
| elif current and chunks: |
| chunks[-1].page_content += "\n\n" + current |
|
|
| return chunks |
|
|
|
|
| |
| |
| |
|
|
| class TextProcessingPipeline: |
| """ |
| 文本处理流水线 |
| |
| 用法: |
| pipeline = TextProcessingPipeline() |
| chunks = pipeline.process(raw_documents) |
| """ |
|
|
| def __init__( |
| self, |
| chunk_size: int = config.CHUNK_SIZE, |
| chunk_overlap: int = config.CHUNK_OVERLAP, |
| split_method: str = "recursive", |
| extract_tables: bool = True, |
| extract_formulas: bool = False, |
| clean_text: bool = True, |
| ): |
| self.chunk_size = chunk_size |
| self.chunk_overlap = chunk_overlap |
| self.split_method = split_method |
| self.extract_tables = extract_tables |
| self.extract_formulas = extract_formulas |
| self.clean_text = clean_text |
|
|
| if split_method == "markdown": |
| self.splitter = MarkdownAwareSplitter( |
| target_chunk_size=chunk_size, |
| min_chunk_size=max(50, chunk_size // 4), |
| ) |
| else: |
| self.splitter = DocumentSplitter( |
| chunk_size=chunk_size, |
| chunk_overlap=chunk_overlap, |
| ) |
|
|
| def process(self, documents: List[Document]) -> List[Document]: |
| """ |
| 完整处理流水线: |
| 原始文档 → 清洗 → 提取表格/公式 → 分割 → 最终块 |
| """ |
| docs = list(documents) |
| logger.info(f"文本处理流水线启动: {len(docs)} 个原始文档") |
|
|
| |
| if self.clean_text: |
| docs = MarkdownTextCleaner.clean_documents(docs) |
|
|
| |
| extra_docs = [] |
| if self.extract_tables: |
| extra_docs.extend(MarkdownTextCleaner.extract_tables_as_chunks(docs)) |
| if self.extract_formulas: |
| extra_docs.extend(MarkdownTextCleaner.extract_formulas_as_chunks(docs)) |
|
|
| |
| chunks = self.splitter.split_documents(docs) |
|
|
| |
| if extra_docs: |
| chunks.extend(extra_docs) |
| logger.info(f"合并特殊块后总计: {len(chunks)} 个文本块") |
|
|
| |
| for i, chunk in enumerate(chunks): |
| chunk.metadata["chunk_id"] = f"chunk_{i:06d}" |
|
|
| logger.info(f"文本处理完成: {len(documents)} 页 → {len(chunks)} 个文本块") |
| return chunks |
|
|
|
|
| |
| |
| |
|
|
| def process_documents( |
| documents: List[Document], |
| chunk_size: int = config.CHUNK_SIZE, |
| chunk_overlap: int = config.CHUNK_OVERLAP, |
| **kwargs, |
| ) -> List[Document]: |
| """便捷函数: 一键文本处理""" |
| pipeline = TextProcessingPipeline( |
| chunk_size=chunk_size, |
| chunk_overlap=chunk_overlap, |
| **kwargs, |
| ) |
| return pipeline.process(documents) |
|
|