| | """ |
| | Document Chunker Implementation |
| | |
| | Creates semantic chunks from document content with bounding box tracking. |
| | Includes TableAwareChunker for preserving table structure in markdown format. |
| | """ |
| |
|
| | import uuid |
| | import time |
| | import re |
| | from typing import List, Optional, Dict, Any, Tuple |
| | from dataclasses import dataclass |
| | from pydantic import BaseModel, Field |
| | from loguru import logger |
| | from collections import defaultdict |
| |
|
| | from ..schemas.core import ( |
| | BoundingBox, |
| | DocumentChunk, |
| | ChunkType, |
| | LayoutRegion, |
| | LayoutType, |
| | OCRRegion, |
| | ) |
| |
|
| |
|
| | class ChunkerConfig(BaseModel): |
| | """Configuration for document chunking.""" |
| | |
| | max_chunk_chars: int = Field( |
| | default=1000, |
| | ge=100, |
| | description="Maximum characters per chunk" |
| | ) |
| | min_chunk_chars: int = Field( |
| | default=50, |
| | ge=10, |
| | description="Minimum characters per chunk" |
| | ) |
| | overlap_chars: int = Field( |
| | default=100, |
| | ge=0, |
| | description="Character overlap between chunks" |
| | ) |
| |
|
| | |
| | strategy: str = Field( |
| | default="semantic", |
| | description="Chunking strategy: semantic, fixed, or layout" |
| | ) |
| | respect_layout: bool = Field( |
| | default=True, |
| | description="Respect layout region boundaries" |
| | ) |
| | merge_small_regions: bool = Field( |
| | default=True, |
| | description="Merge small adjacent regions" |
| | ) |
| |
|
| | |
| | chunk_tables: bool = Field( |
| | default=True, |
| | description="Create separate chunks for tables" |
| | ) |
| | chunk_figures: bool = Field( |
| | default=True, |
| | description="Create separate chunks for figures" |
| | ) |
| | include_captions: bool = Field( |
| | default=True, |
| | description="Include captions with figures/tables" |
| | ) |
| |
|
| | |
| | split_on_sentences: bool = Field( |
| | default=True, |
| | description="Split on sentence boundaries when possible" |
| | ) |
| |
|
| | |
| | preserve_table_structure: bool = Field( |
| | default=True, |
| | description="Preserve table structure as markdown with structured data" |
| | ) |
| | table_row_threshold: float = Field( |
| | default=10.0, |
| | description="Y-coordinate threshold for grouping cells into rows" |
| | ) |
| | table_col_threshold: float = Field( |
| | default=20.0, |
| | description="X-coordinate threshold for grouping cells into columns" |
| | ) |
| | detect_table_headers: bool = Field( |
| | default=True, |
| | description="Attempt to detect and mark header rows" |
| | ) |
| |
|
| |
|
| | |
| | LAYOUT_TO_CHUNK_TYPE = { |
| | LayoutType.TEXT: ChunkType.TEXT, |
| | LayoutType.TITLE: ChunkType.TITLE, |
| | LayoutType.HEADING: ChunkType.HEADING, |
| | LayoutType.PARAGRAPH: ChunkType.PARAGRAPH, |
| | LayoutType.LIST: ChunkType.LIST_ITEM, |
| | LayoutType.TABLE: ChunkType.TABLE, |
| | LayoutType.FIGURE: ChunkType.FIGURE, |
| | LayoutType.CHART: ChunkType.CHART, |
| | LayoutType.FORMULA: ChunkType.FORMULA, |
| | LayoutType.CAPTION: ChunkType.CAPTION, |
| | LayoutType.FOOTNOTE: ChunkType.FOOTNOTE, |
| | LayoutType.HEADER: ChunkType.HEADER, |
| | LayoutType.FOOTER: ChunkType.FOOTER, |
| | } |
| |
|
| |
|
| | class DocumentChunker: |
| | """Base class for document chunkers.""" |
| |
|
| | def __init__(self, config: Optional[ChunkerConfig] = None): |
| | self.config = config or ChunkerConfig() |
| |
|
| | def create_chunks( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | layout_regions: Optional[List[LayoutRegion]] = None, |
| | document_id: str = "", |
| | source_path: Optional[str] = None, |
| | ) -> List[DocumentChunk]: |
| | """ |
| | Create chunks from OCR and layout regions. |
| | |
| | Args: |
| | ocr_regions: OCR text regions |
| | layout_regions: Optional layout regions |
| | document_id: Parent document ID |
| | source_path: Source file path |
| | |
| | Returns: |
| | List of DocumentChunk |
| | """ |
| | raise NotImplementedError |
| |
|
| |
|
| | class SemanticChunker(DocumentChunker): |
| | """ |
| | Semantic chunker that respects document structure. |
| | |
| | Creates chunks based on: |
| | - Layout region boundaries |
| | - Semantic coherence (paragraphs, sections) |
| | - Size constraints with overlap |
| | """ |
| |
|
| | def create_chunks( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | layout_regions: Optional[List[LayoutRegion]] = None, |
| | document_id: str = "", |
| | source_path: Optional[str] = None, |
| | ) -> List[DocumentChunk]: |
| | """Create semantic chunks from document content.""" |
| | if not ocr_regions: |
| | return [] |
| |
|
| | start_time = time.time() |
| | chunks = [] |
| | chunk_index = 0 |
| |
|
| | if layout_regions and self.config.respect_layout: |
| | |
| | chunks = self._chunk_by_layout( |
| | ocr_regions, layout_regions, document_id, source_path |
| | ) |
| | else: |
| | |
| | chunks = self._chunk_by_text( |
| | ocr_regions, document_id, source_path |
| | ) |
| |
|
| | |
| | for i, chunk in enumerate(chunks): |
| | chunk.sequence_index = i |
| |
|
| | logger.debug( |
| | f"Created {len(chunks)} chunks in " |
| | f"{(time.time() - start_time) * 1000:.1f}ms" |
| | ) |
| |
|
| | return chunks |
| |
|
| | def _chunk_by_layout( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | layout_regions: List[LayoutRegion], |
| | document_id: str, |
| | source_path: Optional[str], |
| | ) -> List[DocumentChunk]: |
| | """Create chunks based on layout regions.""" |
| | chunks = [] |
| |
|
| | |
| | sorted_layouts = sorted( |
| | layout_regions, |
| | key=lambda r: (r.reading_order or 0, r.bbox.y_min, r.bbox.x_min) |
| | ) |
| |
|
| | for layout in sorted_layouts: |
| | |
| | contained_ocr = self._get_contained_ocr(ocr_regions, layout) |
| |
|
| | if not contained_ocr: |
| | continue |
| |
|
| | |
| | chunk_type = LAYOUT_TO_CHUNK_TYPE.get(layout.type, ChunkType.TEXT) |
| |
|
| | |
| | if layout.type == LayoutType.TABLE and self.config.chunk_tables: |
| | chunk = self._create_table_chunk( |
| | contained_ocr, layout, document_id, source_path |
| | ) |
| | chunks.append(chunk) |
| |
|
| | elif layout.type in (LayoutType.FIGURE, LayoutType.CHART) and self.config.chunk_figures: |
| | chunk = self._create_figure_chunk( |
| | contained_ocr, layout, document_id, source_path |
| | ) |
| | chunks.append(chunk) |
| |
|
| | else: |
| | |
| | text_chunks = self._create_text_chunks( |
| | contained_ocr, layout, chunk_type, document_id, source_path |
| | ) |
| | chunks.extend(text_chunks) |
| |
|
| | return chunks |
| |
|
| | def _chunk_by_text( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | document_id: str, |
| | source_path: Optional[str], |
| | ) -> List[DocumentChunk]: |
| | """Create chunks from text without layout guidance.""" |
| | chunks = [] |
| |
|
| | |
| | sorted_regions = sorted( |
| | ocr_regions, |
| | key=lambda r: (r.page, r.bbox.y_min, r.bbox.x_min) |
| | ) |
| |
|
| | |
| | pages: Dict[int, List[OCRRegion]] = {} |
| | for r in sorted_regions: |
| | if r.page not in pages: |
| | pages[r.page] = [] |
| | pages[r.page].append(r) |
| |
|
| | |
| | for page_num in sorted(pages.keys()): |
| | page_regions = pages[page_num] |
| | page_chunks = self._split_text_regions( |
| | page_regions, document_id, source_path, page_num |
| | ) |
| | chunks.extend(page_chunks) |
| |
|
| | return chunks |
| |
|
| | def _get_contained_ocr( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | layout: LayoutRegion, |
| | ) -> List[OCRRegion]: |
| | """Get OCR regions contained within a layout region.""" |
| | contained = [] |
| | for ocr in ocr_regions: |
| | if ocr.page == layout.page: |
| | |
| | iou = layout.bbox.iou(ocr.bbox) |
| | if iou > 0.3 or layout.bbox.contains(ocr.bbox): |
| | contained.append(ocr) |
| | return contained |
| |
|
| | def _create_text_chunks( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | layout: LayoutRegion, |
| | chunk_type: ChunkType, |
| | document_id: str, |
| | source_path: Optional[str], |
| | ) -> List[DocumentChunk]: |
| | """Create text chunks from OCR regions, splitting if needed.""" |
| | chunks = [] |
| |
|
| | |
| | text = " ".join(r.text for r in ocr_regions) |
| |
|
| | |
| | avg_conf = sum(r.confidence for r in ocr_regions) / len(ocr_regions) |
| |
|
| | |
| | if len(text) <= self.config.max_chunk_chars: |
| | |
| | chunk = DocumentChunk( |
| | chunk_id=f"{document_id}_{uuid.uuid4().hex[:8]}", |
| | chunk_type=chunk_type, |
| | text=text, |
| | bbox=layout.bbox, |
| | page=layout.page, |
| | document_id=document_id, |
| | source_path=source_path, |
| | sequence_index=0, |
| | confidence=avg_conf, |
| | ) |
| | chunks.append(chunk) |
| | else: |
| | |
| | split_chunks = self._split_text( |
| | text, layout.bbox, layout.page, chunk_type, |
| | document_id, source_path, avg_conf |
| | ) |
| | chunks.extend(split_chunks) |
| |
|
| | return chunks |
| |
|
| | def _split_text( |
| | self, |
| | text: str, |
| | bbox: BoundingBox, |
| | page: int, |
| | chunk_type: ChunkType, |
| | document_id: str, |
| | source_path: Optional[str], |
| | confidence: float, |
| | ) -> List[DocumentChunk]: |
| | """Split long text into multiple chunks with overlap.""" |
| | chunks = [] |
| | max_chars = self.config.max_chunk_chars |
| | overlap = self.config.overlap_chars |
| |
|
| | |
| | if self.config.split_on_sentences: |
| | sentences = self._split_sentences(text) |
| | else: |
| | sentences = [text] |
| |
|
| | current_text = "" |
| | for sentence in sentences: |
| | if len(current_text) + len(sentence) > max_chars and current_text: |
| | |
| | chunk = DocumentChunk( |
| | chunk_id=f"{document_id}_{uuid.uuid4().hex[:8]}", |
| | chunk_type=chunk_type, |
| | text=current_text.strip(), |
| | bbox=bbox, |
| | page=page, |
| | document_id=document_id, |
| | source_path=source_path, |
| | sequence_index=len(chunks), |
| | confidence=confidence, |
| | ) |
| | chunks.append(chunk) |
| |
|
| | |
| | if overlap > 0: |
| | overlap_text = current_text[-overlap:] if len(current_text) > overlap else current_text |
| | current_text = overlap_text + " " + sentence |
| | else: |
| | current_text = sentence |
| | else: |
| | current_text += " " + sentence if current_text else sentence |
| |
|
| | |
| | if current_text.strip(): |
| | chunk = DocumentChunk( |
| | chunk_id=f"{document_id}_{uuid.uuid4().hex[:8]}", |
| | chunk_type=chunk_type, |
| | text=current_text.strip(), |
| | bbox=bbox, |
| | page=page, |
| | document_id=document_id, |
| | source_path=source_path, |
| | sequence_index=len(chunks), |
| | confidence=confidence, |
| | ) |
| | chunks.append(chunk) |
| |
|
| | return chunks |
| |
|
| | def _split_sentences(self, text: str) -> List[str]: |
| | """Split text into sentences.""" |
| | |
| | import re |
| | sentences = re.split(r'(?<=[.!?])\s+', text) |
| | return [s.strip() for s in sentences if s.strip()] |
| |
|
| | def _create_table_chunk( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | layout: LayoutRegion, |
| | document_id: str, |
| | source_path: Optional[str], |
| | ) -> DocumentChunk: |
| | """ |
| | Create a chunk for table content with structure preservation. |
| | |
| | Enhanced table handling (FG-002): |
| | - Reconstructs table structure from OCR regions |
| | - Generates markdown table representation |
| | - Stores structured data for SQL-like queries |
| | - Detects and marks header rows |
| | """ |
| | if not ocr_regions: |
| | return DocumentChunk( |
| | chunk_id=f"{document_id}_table_{uuid.uuid4().hex[:8]}", |
| | chunk_type=ChunkType.TABLE, |
| | text="[Empty Table]", |
| | bbox=layout.bbox, |
| | page=layout.page, |
| | document_id=document_id, |
| | source_path=source_path, |
| | sequence_index=0, |
| | confidence=0.0, |
| | extra=layout.extra or {}, |
| | ) |
| |
|
| | avg_conf = sum(r.confidence for r in ocr_regions) / len(ocr_regions) |
| |
|
| | |
| | if not self.config.preserve_table_structure: |
| | |
| | text = " | ".join(r.text for r in ocr_regions) |
| | return DocumentChunk( |
| | chunk_id=f"{document_id}_table_{uuid.uuid4().hex[:8]}", |
| | chunk_type=ChunkType.TABLE, |
| | text=text, |
| | bbox=layout.bbox, |
| | page=layout.page, |
| | document_id=document_id, |
| | source_path=source_path, |
| | sequence_index=0, |
| | confidence=avg_conf, |
| | extra=layout.extra or {}, |
| | ) |
| |
|
| | |
| | table_data = self._reconstruct_table_structure(ocr_regions) |
| |
|
| | |
| | markdown_table = self._table_to_markdown( |
| | table_data["rows"], |
| | table_data["headers"], |
| | table_data["has_header"] |
| | ) |
| |
|
| | |
| | table_extra = { |
| | **(layout.extra or {}), |
| | "table_structure": { |
| | "row_count": table_data["row_count"], |
| | "col_count": table_data["col_count"], |
| | "has_header": table_data["has_header"], |
| | "headers": table_data["headers"], |
| | "cells": table_data["cells"], |
| | "cell_positions": table_data["cell_positions"], |
| | }, |
| | "format": "markdown", |
| | "searchable_text": table_data["searchable_text"], |
| | } |
| |
|
| | return DocumentChunk( |
| | chunk_id=f"{document_id}_table_{uuid.uuid4().hex[:8]}", |
| | chunk_type=ChunkType.TABLE, |
| | text=markdown_table, |
| | bbox=layout.bbox, |
| | page=layout.page, |
| | document_id=document_id, |
| | source_path=source_path, |
| | sequence_index=0, |
| | confidence=avg_conf, |
| | extra=table_extra, |
| | ) |
| |
|
| | def _reconstruct_table_structure( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | ) -> Dict[str, Any]: |
| | """ |
| | Reconstruct table structure from OCR regions based on spatial positions. |
| | |
| | Groups OCR regions into rows and columns by analyzing their bounding boxes. |
| | Returns structured table data for markdown generation and queries. |
| | """ |
| | if not ocr_regions: |
| | return { |
| | "rows": [], |
| | "headers": [], |
| | "has_header": False, |
| | "row_count": 0, |
| | "col_count": 0, |
| | "cells": [], |
| | "cell_positions": [], |
| | "searchable_text": "", |
| | } |
| |
|
| | |
| | sorted_regions = sorted( |
| | ocr_regions, |
| | key=lambda r: (r.bbox.y_min, r.bbox.x_min) |
| | ) |
| |
|
| | |
| | row_threshold = self.config.table_row_threshold |
| | rows: List[List[OCRRegion]] = [] |
| | current_row: List[OCRRegion] = [] |
| | current_y = None |
| |
|
| | for region in sorted_regions: |
| | if current_y is None: |
| | current_y = region.bbox.y_min |
| | current_row.append(region) |
| | elif abs(region.bbox.y_min - current_y) <= row_threshold: |
| | current_row.append(region) |
| | else: |
| | if current_row: |
| | |
| | current_row.sort(key=lambda r: r.bbox.x_min) |
| | rows.append(current_row) |
| | current_row = [region] |
| | current_y = region.bbox.y_min |
| |
|
| | |
| | if current_row: |
| | current_row.sort(key=lambda r: r.bbox.x_min) |
| | rows.append(current_row) |
| |
|
| | |
| | |
| | col_positions = self._detect_column_positions(rows) |
| | num_cols = len(col_positions) if col_positions else max(len(row) for row in rows) |
| |
|
| | |
| | cells: List[List[str]] = [] |
| | cell_positions: List[List[Dict[str, Any]]] = [] |
| |
|
| | for row in rows: |
| | row_cells = self._assign_cells_to_columns(row, col_positions, num_cols) |
| | cells.append([cell["text"] for cell in row_cells]) |
| | cell_positions.append([{ |
| | "text": cell["text"], |
| | "bbox": cell["bbox"], |
| | "confidence": cell["confidence"] |
| | } for cell in row_cells]) |
| |
|
| | |
| | has_header = False |
| | headers: List[str] = [] |
| |
|
| | if self.config.detect_table_headers and len(cells) > 0: |
| | has_header, headers = self._detect_header_row(cells, rows) |
| |
|
| | |
| | searchable_parts = [] |
| | for i, row in enumerate(cells): |
| | if has_header and i == 0: |
| | searchable_parts.append("Headers: " + ", ".join(row)) |
| | else: |
| | if has_header and headers: |
| | |
| | for j, cell in enumerate(row): |
| | if j < len(headers) and headers[j]: |
| | searchable_parts.append(f"{headers[j]}: {cell}") |
| | else: |
| | searchable_parts.append(cell) |
| | else: |
| | searchable_parts.extend(row) |
| |
|
| | return { |
| | "rows": cells, |
| | "headers": headers, |
| | "has_header": has_header, |
| | "row_count": len(cells), |
| | "col_count": num_cols, |
| | "cells": cells, |
| | "cell_positions": cell_positions, |
| | "searchable_text": " | ".join(searchable_parts), |
| | } |
| |
|
| | def _detect_column_positions( |
| | self, |
| | rows: List[List[OCRRegion]], |
| | ) -> List[Tuple[float, float]]: |
| | """ |
| | Detect consistent column boundaries from table rows. |
| | |
| | Returns list of (x_start, x_end) tuples for each column. |
| | """ |
| | if not rows: |
| | return [] |
| |
|
| | col_threshold = self.config.table_col_threshold |
| |
|
| | |
| | all_x_starts = [] |
| | for row in rows: |
| | for region in row: |
| | all_x_starts.append(region.bbox.x_min) |
| |
|
| | if not all_x_starts: |
| | return [] |
| |
|
| | |
| | all_x_starts.sort() |
| | columns = [] |
| | current_col_start = all_x_starts[0] |
| | current_col_regions = [all_x_starts[0]] |
| |
|
| | for x in all_x_starts[1:]: |
| | if x - current_col_regions[-1] <= col_threshold: |
| | current_col_regions.append(x) |
| | else: |
| | |
| | col_center = sum(current_col_regions) / len(current_col_regions) |
| | columns.append(col_center) |
| | current_col_regions = [x] |
| |
|
| | |
| | if current_col_regions: |
| | col_center = sum(current_col_regions) / len(current_col_regions) |
| | columns.append(col_center) |
| |
|
| | |
| | col_ranges = [] |
| | for i, col_x in enumerate(columns): |
| | x_start = col_x - col_threshold |
| | if i < len(columns) - 1: |
| | x_end = (col_x + columns[i + 1]) / 2 |
| | else: |
| | x_end = col_x + col_threshold * 3 |
| | col_ranges.append((x_start, x_end)) |
| |
|
| | return col_ranges |
| |
|
| | def _assign_cells_to_columns( |
| | self, |
| | row_regions: List[OCRRegion], |
| | col_positions: List[Tuple[float, float]], |
| | num_cols: int, |
| | ) -> List[Dict[str, Any]]: |
| | """ |
| | Assign OCR regions in a row to their respective columns. |
| | Handles merged cells and missing cells. |
| | """ |
| | |
| | row_cells = [ |
| | {"text": "", "bbox": None, "confidence": 0.0} |
| | for _ in range(num_cols) |
| | ] |
| |
|
| | if not col_positions: |
| | |
| | for i, region in enumerate(row_regions): |
| | if i < num_cols: |
| | row_cells[i] = { |
| | "text": region.text.strip(), |
| | "bbox": region.bbox.to_xyxy(), |
| | "confidence": region.confidence, |
| | } |
| | return row_cells |
| |
|
| | |
| | for region in row_regions: |
| | region_x = region.bbox.x_min |
| | assigned = False |
| |
|
| | for col_idx, (x_start, x_end) in enumerate(col_positions): |
| | if x_start <= region_x <= x_end: |
| | |
| | if row_cells[col_idx]["text"]: |
| | row_cells[col_idx]["text"] += " " + region.text.strip() |
| | else: |
| | row_cells[col_idx]["text"] = region.text.strip() |
| | row_cells[col_idx]["bbox"] = region.bbox.to_xyxy() |
| | row_cells[col_idx]["confidence"] = max( |
| | row_cells[col_idx]["confidence"], |
| | region.confidence |
| | ) |
| | assigned = True |
| | break |
| |
|
| | |
| | if not assigned: |
| | min_dist = float("inf") |
| | nearest_col = 0 |
| | for col_idx, (x_start, x_end) in enumerate(col_positions): |
| | col_center = (x_start + x_end) / 2 |
| | dist = abs(region_x - col_center) |
| | if dist < min_dist: |
| | min_dist = dist |
| | nearest_col = col_idx |
| |
|
| | if row_cells[nearest_col]["text"]: |
| | row_cells[nearest_col]["text"] += " " + region.text.strip() |
| | else: |
| | row_cells[nearest_col]["text"] = region.text.strip() |
| | row_cells[nearest_col]["bbox"] = region.bbox.to_xyxy() |
| | row_cells[nearest_col]["confidence"] = region.confidence |
| |
|
| | return row_cells |
| |
|
| | def _detect_header_row( |
| | self, |
| | cells: List[List[str]], |
| | rows: List[List[OCRRegion]], |
| | ) -> Tuple[bool, List[str]]: |
| | """ |
| | Detect if the first row is a header row. |
| | |
| | Heuristics used: |
| | - First row contains non-numeric text |
| | - First row text is shorter (labels vs data) |
| | - First row has distinct formatting (if available) |
| | """ |
| | if not cells or len(cells) < 2: |
| | return False, [] |
| |
|
| | first_row = cells[0] |
| | other_rows = cells[1:] |
| |
|
| | |
| | first_row_numeric_count = sum( |
| | 1 for cell in first_row |
| | if cell and self._is_numeric(cell) |
| | ) |
| | first_row_text_ratio = (len(first_row) - first_row_numeric_count) / max(len(first_row), 1) |
| |
|
| | |
| | other_numeric_ratios = [] |
| | for row in other_rows: |
| | if row: |
| | numeric_count = sum(1 for cell in row if cell and self._is_numeric(cell)) |
| | other_numeric_ratios.append(numeric_count / max(len(row), 1)) |
| |
|
| | avg_other_numeric = sum(other_numeric_ratios) / max(len(other_numeric_ratios), 1) |
| |
|
| | |
| | is_header = ( |
| | first_row_text_ratio > 0.5 and |
| | (avg_other_numeric > first_row_text_ratio * 0.5 or first_row_text_ratio > 0.8) |
| | ) |
| |
|
| | |
| | first_row_avg_len = sum(len(cell) for cell in first_row) / max(len(first_row), 1) |
| | other_avg_lens = [ |
| | sum(len(cell) for cell in row) / max(len(row), 1) |
| | for row in other_rows |
| | ] |
| | avg_other_len = sum(other_avg_lens) / max(len(other_avg_lens), 1) |
| |
|
| | if first_row_avg_len < avg_other_len * 0.8: |
| | is_header = True |
| |
|
| | return is_header, first_row if is_header else [] |
| |
|
| | def _is_numeric(self, text: str) -> bool: |
| | """Check if text is primarily numeric (including currency, percentages).""" |
| | cleaned = re.sub(r'[$€£¥%,.\s\-+()]', '', text) |
| | return cleaned.isdigit() if cleaned else False |
| |
|
| | def _table_to_markdown( |
| | self, |
| | rows: List[List[str]], |
| | headers: List[str], |
| | has_header: bool, |
| | ) -> str: |
| | """ |
| | Convert table data to markdown format. |
| | |
| | Creates a properly formatted markdown table with: |
| | - Header row (if detected) |
| | - Separator row |
| | - Data rows |
| | """ |
| | if not rows: |
| | return "[Empty Table]" |
| |
|
| | |
| | num_cols = max(len(row) for row in rows) if rows else 0 |
| | if num_cols == 0: |
| | return "[Empty Table]" |
| |
|
| | |
| | normalized_rows = [] |
| | for row in rows: |
| | normalized = row + [""] * (num_cols - len(row)) |
| | normalized_rows.append(normalized) |
| |
|
| | |
| | md_lines = [] |
| |
|
| | if has_header and headers: |
| | |
| | header_line = "| " + " | ".join(headers + [""] * (num_cols - len(headers))) + " |" |
| | separator = "| " + " | ".join(["---"] * num_cols) + " |" |
| | md_lines.append(header_line) |
| | md_lines.append(separator) |
| | data_rows = normalized_rows[1:] |
| | else: |
| | |
| | generic_headers = [f"Col{i+1}" for i in range(num_cols)] |
| | header_line = "| " + " | ".join(generic_headers) + " |" |
| | separator = "| " + " | ".join(["---"] * num_cols) + " |" |
| | md_lines.append(header_line) |
| | md_lines.append(separator) |
| | data_rows = normalized_rows |
| |
|
| | |
| | for row in data_rows: |
| | |
| | escaped_row = [cell.replace("|", "\\|") for cell in row] |
| | row_line = "| " + " | ".join(escaped_row) + " |" |
| | md_lines.append(row_line) |
| |
|
| | return "\n".join(md_lines) |
| |
|
| | def _create_figure_chunk( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | layout: LayoutRegion, |
| | document_id: str, |
| | source_path: Optional[str], |
| | ) -> DocumentChunk: |
| | """Create a chunk for figure/chart content.""" |
| | |
| | text = " ".join(r.text for r in ocr_regions) if ocr_regions else "[Figure]" |
| | avg_conf = sum(r.confidence for r in ocr_regions) / len(ocr_regions) if ocr_regions else 0.5 |
| |
|
| | chunk_type = ChunkType.CHART if layout.type == LayoutType.CHART else ChunkType.FIGURE |
| |
|
| | return DocumentChunk( |
| | chunk_id=f"{document_id}_{chunk_type.value}_{uuid.uuid4().hex[:8]}", |
| | chunk_type=chunk_type, |
| | text=text, |
| | bbox=layout.bbox, |
| | page=layout.page, |
| | document_id=document_id, |
| | source_path=source_path, |
| | sequence_index=0, |
| | confidence=avg_conf, |
| | caption=text if ocr_regions else None, |
| | ) |
| |
|
| | def _split_text_regions( |
| | self, |
| | ocr_regions: List[OCRRegion], |
| | document_id: str, |
| | source_path: Optional[str], |
| | page_num: int, |
| | ) -> List[DocumentChunk]: |
| | """Split OCR regions into chunks without layout guidance.""" |
| | if not ocr_regions: |
| | return [] |
| |
|
| | chunks = [] |
| | current_text = "" |
| | current_regions = [] |
| |
|
| | for region in ocr_regions: |
| | if len(current_text) + len(region.text) > self.config.max_chunk_chars: |
| | if current_regions: |
| | |
| | chunk = self._create_chunk_from_regions( |
| | current_regions, document_id, source_path, page_num, len(chunks) |
| | ) |
| | chunks.append(chunk) |
| |
|
| | current_text = region.text |
| | current_regions = [region] |
| | else: |
| | current_text += " " + region.text |
| | current_regions.append(region) |
| |
|
| | |
| | if current_regions: |
| | chunk = self._create_chunk_from_regions( |
| | current_regions, document_id, source_path, page_num, len(chunks) |
| | ) |
| | chunks.append(chunk) |
| |
|
| | return chunks |
| |
|
| | def _create_chunk_from_regions( |
| | self, |
| | regions: List[OCRRegion], |
| | document_id: str, |
| | source_path: Optional[str], |
| | page_num: int, |
| | sequence_index: int, |
| | ) -> DocumentChunk: |
| | """Create a chunk from a list of OCR regions.""" |
| | text = " ".join(r.text for r in regions) |
| | avg_conf = sum(r.confidence for r in regions) / len(regions) |
| |
|
| | |
| | x_min = min(r.bbox.x_min for r in regions) |
| | y_min = min(r.bbox.y_min for r in regions) |
| | x_max = max(r.bbox.x_max for r in regions) |
| | y_max = max(r.bbox.y_max for r in regions) |
| |
|
| | bbox = BoundingBox( |
| | x_min=x_min, y_min=y_min, |
| | x_max=x_max, y_max=y_max, |
| | normalized=False, |
| | ) |
| |
|
| | return DocumentChunk( |
| | chunk_id=f"{document_id}_{uuid.uuid4().hex[:8]}", |
| | chunk_type=ChunkType.TEXT, |
| | text=text, |
| | bbox=bbox, |
| | page=page_num, |
| | document_id=document_id, |
| | source_path=source_path, |
| | sequence_index=sequence_index, |
| | confidence=avg_conf, |
| | ) |
| |
|
| |
|
| | |
| | _document_chunker: Optional[DocumentChunker] = None |
| |
|
| |
|
| | def get_document_chunker( |
| | config: Optional[ChunkerConfig] = None, |
| | ) -> DocumentChunker: |
| | """Get or create singleton document chunker.""" |
| | global _document_chunker |
| | if _document_chunker is None: |
| | config = config or ChunkerConfig() |
| | _document_chunker = SemanticChunker(config) |
| | return _document_chunker |
| |
|